Merge pull request #890 from crodas/feature/sql-base

Introduce `cdk-sql-common`
This commit is contained in:
C
2025-07-29 17:03:32 -03:00
committed by GitHub
77 changed files with 4592 additions and 3872 deletions

View File

@@ -54,6 +54,7 @@ cdk-fake-wallet = { path = "./crates/cdk-fake-wallet", version = "=0.11.0" }
cdk-payment-processor = { path = "./crates/cdk-payment-processor", default-features = true, version = "=0.11.0" }
cdk-mint-rpc = { path = "./crates/cdk-mint-rpc", version = "=0.11.0" }
cdk-redb = { path = "./crates/cdk-redb", default-features = true, version = "=0.11.0" }
cdk-sql-common = { path = "./crates/cdk-sql-common", default-features = true, version = "=0.11.0" }
cdk-sqlite = { path = "./crates/cdk-sqlite", default-features = true, version = "=0.11.0" }
cdk-signatory = { path = "./crates/cdk-signatory", version = "=0.11.0", default-features = false }
clap = { version = "4.5.31", features = ["derive"] }

View File

@@ -128,7 +128,7 @@ async fn main() -> Result<()> {
#[cfg(feature = "sqlcipher")]
let sql = {
match args.password {
Some(pass) => WalletSqliteDatabase::new(&sql_path, pass).await?,
Some(pass) => WalletSqliteDatabase::new((sql_path, pass)).await?,
None => bail!("Missing database password"),
}
};

View File

@@ -19,6 +19,80 @@ pub use mint::{MintAuthDatabase, MintAuthTransaction};
#[cfg(feature = "wallet")]
pub use wallet::Database as WalletDatabase;
/// Data conversion error
#[derive(thiserror::Error, Debug)]
pub enum ConversionError {
/// Missing columns
#[error("Not enough elements: expected {0}, got {1}")]
MissingColumn(usize, usize),
/// Missing parameter
#[error("Missing parameter {0}")]
MissingParameter(String),
/// Invalid db type
#[error("Invalid type from db, expected {0} got {1}")]
InvalidType(String, String),
/// Invalid data conversion in column
#[error("Error converting {1}, expecting type {0}")]
InvalidConversion(String, String),
/// Mint Url Error
#[error(transparent)]
MintUrl(#[from] crate::mint_url::Error),
/// NUT00 Error
#[error(transparent)]
CDKNUT00(#[from] crate::nuts::nut00::Error),
/// NUT01 Error
#[error(transparent)]
CDKNUT01(#[from] crate::nuts::nut01::Error),
/// NUT02 Error
#[error(transparent)]
CDKNUT02(#[from] crate::nuts::nut02::Error),
/// NUT04 Error
#[error(transparent)]
CDKNUT04(#[from] crate::nuts::nut04::Error),
/// NUT05 Error
#[error(transparent)]
CDKNUT05(#[from] crate::nuts::nut05::Error),
/// NUT07 Error
#[error(transparent)]
CDKNUT07(#[from] crate::nuts::nut07::Error),
/// NUT23 Error
#[error(transparent)]
CDKNUT23(#[from] crate::nuts::nut23::Error),
/// Secret Error
#[error(transparent)]
CDKSECRET(#[from] crate::secret::Error),
/// Serde Error
#[error(transparent)]
Serde(#[from] serde_json::Error),
/// BIP32 Error
#[error(transparent)]
BIP32(#[from] bitcoin::bip32::Error),
/// Generic error
#[error(transparent)]
Generic(#[from] Box<crate::Error>),
}
impl From<crate::Error> for ConversionError {
fn from(err: crate::Error) -> Self {
ConversionError::Generic(Box::new(err))
}
}
/// CDK_database error
#[derive(Debug, thiserror::Error)]
pub enum Error {
@@ -39,6 +113,9 @@ pub enum Error {
/// NUT00 Error
#[error(transparent)]
NUT00(#[from] crate::nuts::nut00::Error),
/// NUT01 Error
#[error(transparent)]
NUT01(#[from] crate::nuts::nut01::Error),
/// NUT02 Error
#[error(transparent)]
NUT02(#[from] crate::nuts::nut02::Error),
@@ -68,6 +145,38 @@ pub enum Error {
/// Invalid state transition
#[error("Invalid state transition")]
InvalidStateTransition(crate::state::Error),
/// Invalid connection settings
#[error("Invalid credentials {0}")]
InvalidConnectionSettings(String),
/// Unexpected database response
#[error("Invalid database response")]
InvalidDbResponse,
/// Internal error
#[error("Internal {0}")]
Internal(String),
/// Data conversion error
#[error(transparent)]
Conversion(#[from] ConversionError),
/// Missing Placeholder value
#[error("Missing placeholder value {0}")]
MissingPlaceholder(String),
/// Unknown quote ttl
#[error("Unknown quote ttl")]
UnknownQuoteTTL,
/// Invalid UUID
#[error("Invalid UUID: {0}")]
InvalidUuid(String),
/// QuoteNotFound
#[error("Quote not found")]
QuoteNotFound,
}
#[cfg(feature = "mint")]

View File

@@ -234,7 +234,7 @@ pub async fn create_and_start_test_mint() -> Result<Mint> {
let temp_dir = create_temp_dir("cdk-test-sqlite-mint")?;
let path = temp_dir.join("mint.db").to_str().unwrap().to_string();
Arc::new(
cdk_sqlite::MintSqliteDatabase::new(&path)
cdk_sqlite::MintSqliteDatabase::new(path.as_str())
.await
.expect("Could not create sqlite db"),
)
@@ -310,7 +310,7 @@ pub async fn create_test_wallet_for_mint(mint: Mint) -> Result<Wallet> {
// Create a temporary directory for SQLite database
let temp_dir = create_temp_dir("cdk-test-sqlite-wallet")?;
let path = temp_dir.join("wallet.db").to_str().unwrap().to_string();
let database = cdk_sqlite::WalletSqliteDatabase::new(&path)
let database = cdk_sqlite::WalletSqliteDatabase::new(path.as_str())
.await
.expect("Could not create sqlite db");
Arc::new(database)

View File

@@ -211,7 +211,7 @@ async fn setup_sqlite_database(
#[cfg(feature = "sqlcipher")]
let db = {
// Get password from command line arguments for sqlcipher
MintSqliteDatabase::new(&sql_db_path, _password.unwrap()).await?
MintSqliteDatabase::new((sql_db_path, _password.unwrap())).await?
};
Ok(Arc::new(db))
}
@@ -486,7 +486,7 @@ async fn setup_authentication(
#[cfg(feature = "sqlcipher")]
let password = CLIArgs::parse().password;
#[cfg(feature = "sqlcipher")]
let sqlite_db = MintSqliteAuthDatabase::new(&sql_db_path, password).await?;
let sqlite_db = MintSqliteAuthDatabase::new((sql_db_path, password)).await?;
#[cfg(not(feature = "sqlcipher"))]
let sqlite_db = MintSqliteAuthDatabase::new(&sql_db_path).await?;
Arc::new(sqlite_db)

View File

@@ -108,7 +108,7 @@ pub async fn cli_main() -> Result<()> {
#[cfg(feature = "sqlcipher")]
let db = {
match args.password {
Some(pass) => MintSqliteDatabase::new(&sql_path, pass).await?,
Some(pass) => MintSqliteDatabase::new((&sql_path, pass)).await?,
None => bail!("Missing database password"),
}
};

View File

@@ -0,0 +1,30 @@
[package]
name = "cdk-sql-common"
version.workspace = true
edition.workspace = true
authors = ["CDK Developers"]
description = "Generic SQL storage backend for CDK"
license.workspace = true
homepage = "https://github.com/cashubtc/cdk"
repository = "https://github.com/cashubtc/cdk.git"
rust-version.workspace = true # MSRV
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["mint", "wallet", "auth"]
mint = ["cdk-common/mint"]
wallet = ["cdk-common/wallet"]
auth = ["cdk-common/auth"]
[dependencies]
async-trait.workspace = true
cdk-common = { workspace = true, features = ["test"] }
bitcoin.workspace = true
thiserror.workspace = true
tracing.workspace = true
serde.workspace = true
serde_json.workspace = true
lightning-invoice.workspace = true
uuid.workspace = true
once_cell.workspace = true

View File

@@ -0,0 +1,24 @@
# CDK SQL Base
This is a private crate offering a common framework to interact with SQL databases.
This crate uses standard SQL, a generic migration framework a traits to implement blocking or
non-blocking clients.
**ALPHA** This library is in early development, the API will change and should be used with caution.
## Features
The following crate feature flags are available:
| Feature | Default | Description |
|-------------|:-------:|------------------------------------|
| `wallet` | Yes | Enable cashu wallet features |
| `mint` | Yes | Enable cashu mint wallet features |
| `auth` | Yes | Enable cashu mint auth features |
## License
This project is licensed under the [MIT License](../../LICENSE).

View File

@@ -18,18 +18,36 @@ fn main() {
let dest_path = parent.join("migrations.rs");
let mut out_file = File::create(&dest_path).expect("Failed to create migrations.rs");
writeln!(out_file, "// @generated").unwrap();
writeln!(out_file, "// Auto-generated by build.rs").unwrap();
writeln!(out_file, "pub static MIGRATIONS: &[(&str, &str)] = &[").unwrap();
let skip_name = migration_path.to_str().unwrap_or_default().len();
writeln!(out_file, "/// @generated").unwrap();
writeln!(out_file, "/// Auto-generated by build.rs").unwrap();
writeln!(
out_file,
"pub static MIGRATIONS: &[(&str, &str, &str)] = &["
)
.unwrap();
for path in &files {
let name = path.file_name().unwrap().to_string_lossy();
let parts = path.to_str().unwrap().replace("\\", "/")[skip_name + 1..]
.split("/")
.map(|x| x.to_owned())
.collect::<Vec<_>>();
let prefix = if parts.len() == 2 {
parts.first().map(|x| x.to_owned()).unwrap_or_default()
} else {
"".to_owned()
};
let rel_name = &path.file_name().unwrap().to_str().unwrap();
let rel_path = &path.to_str().unwrap().replace("\\", "/")[skip_path..]; // for Windows
writeln!(
out_file,
" (\"{name}\", include_str!(r#\".{rel_path}\"#)),"
" (\"{prefix}\", \"{rel_name}\", include_str!(r#\".{rel_path}\"#)),"
)
.unwrap();
println!("cargo:rerun-if-changed={}", path.display());
}
writeln!(out_file, "];").unwrap();

View File

@@ -0,0 +1,44 @@
use crate::database::DatabaseExecutor;
use crate::stmt::query;
/// Migrates the migration generated by `build.rs`
#[inline(always)]
pub async fn migrate<C: DatabaseExecutor>(
conn: &C,
db_prefix: &str,
migrations: &[(&str, &str, &str)],
) -> Result<(), cdk_common::database::Error> {
query(
r#"
CREATE TABLE IF NOT EXISTS migrations (
name TEXT PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"#,
)?
.execute(conn)
.await?;
// Apply each migration if it hasnt been applied yet
for (prefix, name, sql) in migrations {
if !prefix.is_empty() && *prefix != db_prefix {
continue;
}
let is_missing = query("SELECT name FROM migrations WHERE name = :name")?
.bind("name", name)
.pluck(conn)
.await?
.is_none();
if is_missing {
query(sql)?.batch(conn).await?;
query(r#"INSERT INTO migrations (name) VALUES (:name)"#)?
.bind("name", name)
.execute(conn)
.await?;
}
}
Ok(())
}

View File

@@ -0,0 +1,53 @@
//! Database traits definition
use std::fmt::Debug;
use cdk_common::database::Error;
use crate::stmt::{Column, Statement};
/// Database Executor
///
/// This trait defines the expectations of a database execution
#[async_trait::async_trait]
pub trait DatabaseExecutor: Debug + Sync + Send {
/// Database driver name
fn name() -> &'static str;
/// Executes a query and returns the affected rows
async fn execute(&self, statement: Statement) -> Result<usize, Error>;
/// Runs the query and returns the first row or None
async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error>;
/// Runs the query and returns the first row or None
async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error>;
/// Fetches the first row and column from a query
async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error>;
/// Batch execution
async fn batch(&self, statement: Statement) -> Result<(), Error>;
}
/// Database transaction trait
#[async_trait::async_trait]
pub trait DatabaseTransaction<'a>: Debug + DatabaseExecutor + Send + Sync {
/// Consumes the current transaction committing the changes
async fn commit(self) -> Result<(), Error>;
/// Consumes the transaction rolling back all changes
async fn rollback(self) -> Result<(), Error>;
}
/// Database connector
#[async_trait::async_trait]
pub trait DatabaseConnector: Debug + DatabaseExecutor + Send + Sync {
/// Transaction type for this database connection
type Transaction<'a>: DatabaseTransaction<'a>
where
Self: 'a;
/// Begin a new transaction
async fn begin(&self) -> Result<Self::Transaction<'_>, Error>;
}

View File

@@ -0,0 +1,23 @@
//! SQLite storage backend for cdk
#![warn(missing_docs)]
#![warn(rustdoc::bare_urls)]
mod common;
pub mod database;
mod macros;
pub mod pool;
pub mod stmt;
pub mod value;
pub use cdk_common::database::ConversionError;
#[cfg(feature = "mint")]
pub mod mint;
#[cfg(feature = "wallet")]
pub mod wallet;
#[cfg(feature = "mint")]
pub use mint::SQLMintDatabase;
#[cfg(feature = "wallet")]
pub use wallet::SQLWalletDatabase;

View File

@@ -1,4 +1,4 @@
//! Collection of macros to generate code to digest data from SQLite
//! Collection of macros to generate code to digest data from a generic SQL databasex
/// Unpacks a vector of Column, and consumes it, parsing into individual variables, checking the
/// vector is big enough.
@@ -10,9 +10,9 @@ macro_rules! unpack_into {
vec.reverse();
let required = 0 $(+ {let _ = stringify!($var); 1})+;
if vec.len() < required {
return Err(Error::MissingColumn(required, vec.len()));
Err($crate::ConversionError::MissingColumn(required, vec.len()))?;
}
Ok::<_, Error>((
Ok::<_, cdk_common::database::Error>((
$(
vec.pop().expect(&format!("Checked length already for {}", stringify!($var)))
),+
@@ -21,7 +21,7 @@ macro_rules! unpack_into {
};
}
/// Parses a SQLite column as a string or NULL
/// Parses a SQL column as a string or NULL
#[macro_export]
macro_rules! column_as_nullable_string {
($col:expr, $callback_str:expr, $callback_bytes:expr) => {
@@ -29,9 +29,9 @@ macro_rules! column_as_nullable_string {
$crate::stmt::Column::Text(text) => Ok(Some(text).and_then($callback_str)),
$crate::stmt::Column::Blob(bytes) => Ok(Some(bytes).and_then($callback_bytes)),
$crate::stmt::Column::Null => Ok(None),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
@@ -42,9 +42,9 @@ macro_rules! column_as_nullable_string {
Ok(Some(String::from_utf8_lossy(&bytes)).and_then($callback_str))
}
$crate::stmt::Column::Null => Ok(None),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
@@ -55,9 +55,9 @@ macro_rules! column_as_nullable_string {
Ok(Some(String::from_utf8_lossy(&bytes).to_string()))
}
$crate::stmt::Column::Null => Ok(None),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
@@ -69,15 +69,21 @@ macro_rules! column_as_nullable_number {
($col:expr) => {
(match $col {
$crate::stmt::Column::Text(text) => Ok(Some(text.parse().map_err(|_| {
Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
$crate::ConversionError::InvalidConversion(
stringify!($col).to_owned(),
"Number".to_owned(),
)
})?)),
$crate::stmt::Column::Integer(n) => Ok(Some(n.try_into().map_err(|_| {
Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
$crate::ConversionError::InvalidConversion(
stringify!($col).to_owned(),
"Number".to_owned(),
)
})?)),
$crate::stmt::Column::Null => Ok(None),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"Number".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
@@ -89,14 +95,20 @@ macro_rules! column_as_number {
($col:expr) => {
(match $col {
$crate::stmt::Column::Text(text) => text.parse().map_err(|_| {
Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
$crate::ConversionError::InvalidConversion(
stringify!($col).to_owned(),
"Number".to_owned(),
)
}),
$crate::stmt::Column::Integer(n) => n.try_into().map_err(|_| {
Error::InvalidConversion(stringify!($col).to_owned(), "Number".to_owned())
$crate::ConversionError::InvalidConversion(
stringify!($col).to_owned(),
"Number".to_owned(),
)
}),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"Number".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
@@ -110,51 +122,57 @@ macro_rules! column_as_nullable_binary {
$crate::stmt::Column::Text(text) => Ok(Some(text.as_bytes().to_vec())),
$crate::stmt::Column::Blob(bytes) => Ok(Some(bytes.to_owned())),
$crate::stmt::Column::Null => Ok(None),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
}
/// Parses a SQLite column as a binary
/// Parses a SQL column as a binary
#[macro_export]
macro_rules! column_as_binary {
($col:expr) => {
(match $col {
$crate::stmt::Column::Text(text) => Ok(text.as_bytes().to_vec()),
$crate::stmt::Column::Blob(bytes) => Ok(bytes.to_owned()),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
}
/// Parses a SQLite column as a string
/// Parses a SQL column as a string
#[macro_export]
macro_rules! column_as_string {
($col:expr, $callback_str:expr, $callback_bytes:expr) => {
(match $col {
$crate::stmt::Column::Text(text) => $callback_str(&text).map_err(Error::from),
$crate::stmt::Column::Blob(bytes) => $callback_bytes(&bytes).map_err(Error::from),
other => Err(Error::InvalidType(
$crate::stmt::Column::Text(text) => {
$callback_str(&text).map_err($crate::ConversionError::from)
}
$crate::stmt::Column::Blob(bytes) => {
$callback_bytes(&bytes).map_err($crate::ConversionError::from)
}
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
($col:expr, $callback:expr) => {
(match $col {
$crate::stmt::Column::Text(text) => $callback(&text).map_err(Error::from),
$crate::stmt::Column::Blob(bytes) => {
$callback(&String::from_utf8_lossy(&bytes)).map_err(Error::from)
$crate::stmt::Column::Text(text) => {
$callback(&text).map_err($crate::ConversionError::from)
}
other => Err(Error::InvalidType(
$crate::stmt::Column::Blob(bytes) => {
$callback(&String::from_utf8_lossy(&bytes)).map_err($crate::ConversionError::from)
}
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};
@@ -162,9 +180,9 @@ macro_rules! column_as_string {
(match $col {
$crate::stmt::Column::Text(text) => Ok(text.to_owned()),
$crate::stmt::Column::Blob(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()),
other => Err(Error::InvalidType(
_ => Err($crate::ConversionError::InvalidType(
"String".to_owned(),
other.data_type().to_string(),
stringify!($col).to_owned(),
)),
})?
};

View File

@@ -0,0 +1,5 @@
/// @generated
/// Auto-generated by build.rs
pub static MIGRATIONS: &[(&str, &str, &str)] = &[
("sqlite", "20250109143347_init.sql", include_str!(r#"./migrations/sqlite/20250109143347_init.sql"#)),
];

View File

@@ -1,8 +1,7 @@
//! SQLite Mint Auth
//! SQL Mint Auth
use std::collections::HashMap;
use std::ops::DerefMut;
use std::path::Path;
use std::marker::PhantomData;
use std::str::FromStr;
use async_trait::async_trait;
@@ -10,53 +9,57 @@ use cdk_common::database::{self, MintAuthDatabase, MintAuthTransaction};
use cdk_common::mint::MintKeySetInfo;
use cdk_common::nuts::{AuthProof, BlindSignature, Id, PublicKey, State};
use cdk_common::{AuthRequired, ProtectedEndpoint};
use migrations::MIGRATIONS;
use tracing::instrument;
use super::async_rusqlite::AsyncRusqlite;
use super::{sqlite_row_to_blind_signature, sqlite_row_to_keyset_info, SqliteTransaction};
use super::{sql_row_to_blind_signature, sql_row_to_keyset_info, SQLTransaction};
use crate::column_as_string;
use crate::common::{create_sqlite_pool, migrate};
use crate::mint::async_rusqlite::query;
use crate::common::migrate;
use crate::database::{DatabaseConnector, DatabaseTransaction};
use crate::mint::Error;
use crate::stmt::query;
/// Mint SQLite Database
/// Mint SQL Database
#[derive(Debug, Clone)]
pub struct MintSqliteAuthDatabase {
pool: AsyncRusqlite,
pub struct SQLMintAuthDatabase<DB>
where
DB: DatabaseConnector,
{
db: DB,
}
impl<DB> SQLMintAuthDatabase<DB>
where
DB: DatabaseConnector,
{
/// Creates a new instance
pub async fn new<X>(db: X) -> Result<Self, Error>
where
X: Into<DB>,
{
let db = db.into();
Self::migrate(&db).await?;
Ok(Self { db })
}
/// Migrate
async fn migrate(conn: &DB) -> Result<(), Error> {
let tx = conn.begin().await?;
migrate(&tx, DB::name(), MIGRATIONS).await?;
tx.commit().await?;
Ok(())
}
}
#[rustfmt::skip]
mod migrations;
impl MintSqliteAuthDatabase {
/// Create new [`MintSqliteAuthDatabase`]
#[cfg(not(feature = "sqlcipher"))]
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let pool = create_sqlite_pool(path.as_ref().to_str().ok_or(Error::InvalidDbPath)?);
migrate(pool.get()?.deref_mut(), migrations::MIGRATIONS)?;
Ok(Self {
pool: AsyncRusqlite::new(pool),
})
}
/// Create new [`MintSqliteAuthDatabase`]
#[cfg(feature = "sqlcipher")]
pub async fn new<P: AsRef<Path>>(path: P, password: String) -> Result<Self, Error> {
let pool = create_sqlite_pool(
path.as_ref().to_str().ok_or(Error::InvalidDbPath)?,
password,
);
migrate(pool.get()?.deref_mut(), migrations::MIGRATIONS)?;
Ok(Self {
pool: AsyncRusqlite::new(pool),
})
}
}
#[async_trait]
impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
impl<'a, T> MintAuthTransaction<database::Error> for SQLTransaction<'a, T>
where
T: DatabaseTransaction<'a>,
{
#[instrument(skip(self))]
async fn set_active_keyset(&mut self, id: Id) -> Result<(), database::Error> {
tracing::info!("Setting auth keyset {id} active");
@@ -68,8 +71,8 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
ELSE FALSE
END;
"#,
)
.bind(":id", id.to_string())
)?
.bind("id", id.to_string())
.execute(&self.inner)
.await?;
@@ -97,15 +100,15 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
max_order = excluded.max_order,
derivation_path_index = excluded.derivation_path_index
"#,
)
.bind(":id", keyset.id.to_string())
.bind(":unit", keyset.unit.to_string())
.bind(":active", keyset.active)
.bind(":valid_from", keyset.valid_from as i64)
.bind(":valid_to", keyset.final_expiry.map(|v| v as i64))
.bind(":derivation_path", keyset.derivation_path.to_string())
.bind(":max_order", keyset.max_order)
.bind(":derivation_path_index", keyset.derivation_path_index)
)?
.bind("id", keyset.id.to_string())
.bind("unit", keyset.unit.to_string())
.bind("active", keyset.active)
.bind("valid_from", keyset.valid_from as i64)
.bind("valid_to", keyset.final_expiry.map(|v| v as i64))
.bind("derivation_path", keyset.derivation_path.to_string())
.bind("max_order", keyset.max_order)
.bind("derivation_path_index", keyset.derivation_path_index)
.execute(&self.inner)
.await?;
@@ -120,12 +123,12 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
VALUES
(:y, :keyset_id, :secret, :c, :state)
"#,
)
.bind(":y", proof.y()?.to_bytes().to_vec())
.bind(":keyset_id", proof.keyset_id.to_string())
.bind(":secret", proof.secret.to_string())
.bind(":c", proof.c.to_bytes().to_vec())
.bind(":state", "UNSPENT".to_string())
)?
.bind("y", proof.y()?.to_bytes().to_vec())
.bind("keyset_id", proof.keyset_id.to_string())
.bind("secret", proof.secret.to_string())
.bind("c", proof.c.to_bytes().to_vec())
.bind("state", "UNSPENT".to_string())
.execute(&self.inner)
.await
{
@@ -139,20 +142,20 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
y: &PublicKey,
proofs_state: State,
) -> Result<Option<State>, Self::Err> {
let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#)
.bind(":y", y.to_bytes().to_vec())
let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#)?
.bind("y", y.to_bytes().to_vec())
.pluck(&self.inner)
.await?
.map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str)))
.transpose()?;
query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#)
.bind(":y", y.to_bytes().to_vec())
query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#)?
.bind("y", y.to_bytes().to_vec())
.bind(
":state",
"state",
current_state.as_ref().map(|state| state.to_string()),
)
.bind(":new_state", proofs_state.to_string())
.bind("new_state", proofs_state.to_string())
.execute(&self.inner)
.await?;
@@ -173,11 +176,11 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
VALUES
(:y, :amount, :keyset_id, :c)
"#,
)
.bind(":y", message.to_bytes().to_vec())
.bind(":amount", u64::from(signature.amount) as i64)
.bind(":keyset_id", signature.keyset_id.to_string())
.bind(":c", signature.c.to_bytes().to_vec())
)?
.bind("y", message.to_bytes().to_vec())
.bind("amount", u64::from(signature.amount) as i64)
.bind("keyset_id", signature.keyset_id.to_string())
.bind("c", signature.c.to_bytes().to_vec())
.execute(&self.inner)
.await?;
}
@@ -196,9 +199,9 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
(endpoint, auth)
VALUES (:endpoint, :auth);
"#,
)
.bind(":endpoint", serde_json::to_string(endpoint)?)
.bind(":auth", serde_json::to_string(auth)?)
)?
.bind("endpoint", serde_json::to_string(endpoint)?)
.bind("auth", serde_json::to_string(auth)?)
.execute(&self.inner)
.await
{
@@ -215,9 +218,9 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
&mut self,
protected_endpoints: Vec<ProtectedEndpoint>,
) -> Result<(), database::Error> {
query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#)
query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#)?
.bind_vec(
":endpoints",
"endpoints",
protected_endpoints
.iter()
.map(serde_json::to_string)
@@ -230,15 +233,19 @@ impl MintAuthTransaction<database::Error> for SqliteTransaction<'_> {
}
#[async_trait]
impl MintAuthDatabase for MintSqliteAuthDatabase {
impl<DB> MintAuthDatabase for SQLMintAuthDatabase<DB>
where
DB: DatabaseConnector,
{
type Err = database::Error;
async fn begin_transaction<'a>(
&'a self,
) -> Result<Box<dyn MintAuthTransaction<database::Error> + Send + Sync + 'a>, database::Error>
{
Ok(Box::new(SqliteTransaction {
inner: self.pool.begin().await?,
Ok(Box::new(SQLTransaction {
inner: self.db.begin().await?,
_phantom: PhantomData,
}))
}
@@ -252,8 +259,8 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
WHERE
active = 1;
"#,
)
.pluck(&self.pool)
)?
.pluck(&self.db)
.await?
.map(|id| Ok::<_, Error>(column_as_string!(id, Id::from_str, Id::from_bytes)))
.transpose()?)
@@ -274,11 +281,11 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
FROM
keyset
WHERE id=:id"#,
)
.bind(":id", id.to_string())
.fetch_one(&self.pool)
)?
.bind("id", id.to_string())
.fetch_one(&self.db)
.await?
.map(sqlite_row_to_keyset_info)
.map(sql_row_to_keyset_info)
.transpose()?)
}
@@ -297,18 +304,18 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
FROM
keyset
WHERE id=:id"#,
)
.fetch_all(&self.pool)
)?
.fetch_all(&self.db)
.await?
.into_iter()
.map(sqlite_row_to_keyset_info)
.map(sql_row_to_keyset_info)
.collect::<Result<Vec<_>, _>>()?)
}
async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
let mut current_states = query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)
.bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
.fetch_all(&self.pool)
let mut current_states = query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
.bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
.fetch_all(&self.db)
.await?
.into_iter()
.map(|row| {
@@ -338,15 +345,15 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
blind_signature
WHERE y IN (:y)
"#,
)
)?
.bind_vec(
":y",
"y",
blinded_messages
.iter()
.map(|y| y.to_bytes().to_vec())
.collect(),
)
.fetch_all(&self.pool)
.fetch_all(&self.db)
.await?
.into_iter()
.map(|mut row| {
@@ -356,7 +363,7 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
PublicKey::from_hex,
PublicKey::from_slice
),
sqlite_row_to_blind_signature(row)?,
sql_row_to_blind_signature(row)?,
))
})
.collect::<Result<HashMap<_, _>, Error>>()?;
@@ -371,9 +378,9 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
protected_endpoint: ProtectedEndpoint,
) -> Result<Option<AuthRequired>, Self::Err> {
Ok(
query(r#"SELECT auth FROM protected_endpoints WHERE endpoint = :endpoint"#)
.bind(":endpoint", serde_json::to_string(&protected_endpoint)?)
.pluck(&self.pool)
query(r#"SELECT auth FROM protected_endpoints WHERE endpoint = :endpoint"#)?
.bind("endpoint", serde_json::to_string(&protected_endpoint)?)
.pluck(&self.db)
.await?
.map(|auth| {
Ok::<_, Error>(column_as_string!(
@@ -389,8 +396,8 @@ impl MintAuthDatabase for MintSqliteAuthDatabase {
async fn get_auth_for_endpoints(
&self,
) -> Result<HashMap<ProtectedEndpoint, Option<AuthRequired>>, Self::Err> {
Ok(query(r#"SELECT endpoint, auth FROM protected_endpoints"#)
.fetch_all(&self.pool)
Ok(query(r#"SELECT endpoint, auth FROM protected_endpoints"#)?
.fetch_all(&self.db)
.await?
.into_iter()
.map(|row| {

View File

@@ -0,0 +1,26 @@
/// @generated
/// Auto-generated by build.rs
pub static MIGRATIONS: &[(&str, &str, &str)] = &[
("sqlite", "1_fix_sqlx_migration.sql", include_str!(r#"./migrations/sqlite/1_fix_sqlx_migration.sql"#)),
("sqlite", "20240612124932_init.sql", include_str!(r#"./migrations/sqlite/20240612124932_init.sql"#)),
("sqlite", "20240618195700_quote_state.sql", include_str!(r#"./migrations/sqlite/20240618195700_quote_state.sql"#)),
("sqlite", "20240626092101_nut04_state.sql", include_str!(r#"./migrations/sqlite/20240626092101_nut04_state.sql"#)),
("sqlite", "20240703122347_request_lookup_id.sql", include_str!(r#"./migrations/sqlite/20240703122347_request_lookup_id.sql"#)),
("sqlite", "20240710145043_input_fee.sql", include_str!(r#"./migrations/sqlite/20240710145043_input_fee.sql"#)),
("sqlite", "20240711183109_derivation_path_index.sql", include_str!(r#"./migrations/sqlite/20240711183109_derivation_path_index.sql"#)),
("sqlite", "20240718203721_allow_unspent.sql", include_str!(r#"./migrations/sqlite/20240718203721_allow_unspent.sql"#)),
("sqlite", "20240811031111_update_mint_url.sql", include_str!(r#"./migrations/sqlite/20240811031111_update_mint_url.sql"#)),
("sqlite", "20240919103407_proofs_quote_id.sql", include_str!(r#"./migrations/sqlite/20240919103407_proofs_quote_id.sql"#)),
("sqlite", "20240923153640_melt_requests.sql", include_str!(r#"./migrations/sqlite/20240923153640_melt_requests.sql"#)),
("sqlite", "20240930101140_dleq_for_sigs.sql", include_str!(r#"./migrations/sqlite/20240930101140_dleq_for_sigs.sql"#)),
("sqlite", "20241108093102_mint_mint_quote_pubkey.sql", include_str!(r#"./migrations/sqlite/20241108093102_mint_mint_quote_pubkey.sql"#)),
("sqlite", "20250103201327_amount_to_pay_msats.sql", include_str!(r#"./migrations/sqlite/20250103201327_amount_to_pay_msats.sql"#)),
("sqlite", "20250129200912_remove_mint_url.sql", include_str!(r#"./migrations/sqlite/20250129200912_remove_mint_url.sql"#)),
("sqlite", "20250129230326_add_config_table.sql", include_str!(r#"./migrations/sqlite/20250129230326_add_config_table.sql"#)),
("sqlite", "20250307213652_keyset_id_as_foreign_key.sql", include_str!(r#"./migrations/sqlite/20250307213652_keyset_id_as_foreign_key.sql"#)),
("sqlite", "20250406091754_mint_time_of_quotes.sql", include_str!(r#"./migrations/sqlite/20250406091754_mint_time_of_quotes.sql"#)),
("sqlite", "20250406093755_mint_created_time_signature.sql", include_str!(r#"./migrations/sqlite/20250406093755_mint_created_time_signature.sql"#)),
("sqlite", "20250415093121_drop_keystore_foreign.sql", include_str!(r#"./migrations/sqlite/20250415093121_drop_keystore_foreign.sql"#)),
("sqlite", "20250626120251_rename_blind_message_y_to_b.sql", include_str!(r#"./migrations/sqlite/20250626120251_rename_blind_message_y_to_b.sql"#)),
("sqlite", "20250706101057_bolt12.sql", include_str!(r#"./migrations/sqlite/20250706101057_bolt12.sql"#)),
];

View File

@@ -0,0 +1,20 @@
-- Migrate `_sqlx_migrations` to our new migration system
CREATE TABLE IF NOT EXISTS _sqlx_migrations AS
SELECT
'' AS version,
'' AS description,
0 AS execution_time
WHERE 0;
INSERT INTO migrations
SELECT
version || '_' || REPLACE(description, ' ', '_') || '.sql',
execution_time
FROM _sqlx_migrations
WHERE EXISTS (
SELECT 1
FROM sqlite_master
WHERE type = 'table' AND name = '_sqlx_migrations'
);
DROP TABLE _sqlx_migrations;

View File

@@ -1,5 +1,5 @@
-- Add foreign key constraints for keyset_id in SQLite
-- SQLite requires recreating tables to add foreign keys
-- SQL requires recreating tables to add foreign keys
-- First, ensure we have the right schema information
PRAGMA foreign_keys = OFF;

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
@@ -30,13 +30,20 @@ pub trait ResourceManager: Debug {
type Resource: Debug;
/// The configuration that is needed in order to create the resource
type Config: Debug;
type Config: Clone + Debug;
/// The error the resource may return when creating a new instance
type Error: Debug;
/// Creates a new resource with a given config
fn new_resource(config: &Self::Config) -> Result<Self::Resource, Error<Self::Error>>;
/// Creates a new resource with a given config.
///
/// If `stale` is ever set to TRUE it is assumed the resource is no longer valid and it will be
/// dropped.
fn new_resource(
config: &Self::Config,
stale: Arc<AtomicBool>,
timeout: Duration,
) -> Result<Self::Resource, Error<Self::Error>>;
/// The object is dropped
fn drop(_resource: Self::Resource) {}
@@ -49,7 +56,7 @@ where
RM: ResourceManager,
{
config: RM::Config,
queue: Mutex<Vec<RM::Resource>>,
queue: Mutex<Vec<(Arc<AtomicBool>, RM::Resource)>>,
in_use: AtomicUsize,
max_size: usize,
default_timeout: Duration,
@@ -61,7 +68,7 @@ pub struct PooledResource<RM>
where
RM: ResourceManager,
{
resource: Option<RM::Resource>,
resource: Option<(Arc<AtomicBool>, RM::Resource)>,
pool: Arc<Pool<RM>>,
}
@@ -88,7 +95,7 @@ where
type Target = RM::Resource;
fn deref(&self) -> &Self::Target {
self.resource.as_ref().expect("resource already dropped")
&self.resource.as_ref().expect("resource already dropped").1
}
}
@@ -97,7 +104,7 @@ where
RM: ResourceManager,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.resource.as_mut().expect("resource already dropped")
&mut self.resource.as_mut().expect("resource already dropped").1
}
}
@@ -135,22 +142,28 @@ where
let mut resources = self.queue.lock().map_err(|_| Error::Poison)?;
loop {
if let Some(resource) = resources.pop() {
drop(resources);
self.in_use.fetch_add(1, Ordering::AcqRel);
if let Some((stale, resource)) = resources.pop() {
if !stale.load(Ordering::SeqCst) {
drop(resources);
self.in_use.fetch_add(1, Ordering::AcqRel);
return Ok(PooledResource {
resource: Some(resource),
pool: self.clone(),
});
return Ok(PooledResource {
resource: Some((stale, resource)),
pool: self.clone(),
});
}
}
if self.in_use.load(Ordering::Relaxed) < self.max_size {
drop(resources);
self.in_use.fetch_add(1, Ordering::AcqRel);
let stale: Arc<AtomicBool> = Arc::new(false.into());
return Ok(PooledResource {
resource: Some(RM::new_resource(&self.config)?),
resource: Some((
stale.clone(),
RM::new_resource(&self.config, stale, timeout)?,
)),
pool: self.clone(),
});
}
@@ -178,7 +191,7 @@ where
if let Ok(mut resources) = self.queue.lock() {
loop {
while let Some(resource) = resources.pop() {
RM::drop(resource);
RM::drop(resource.1);
}
if self.in_use.load(Ordering::Relaxed) == 0 {

View File

@@ -0,0 +1,359 @@
//! Stataments mod
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use cdk_common::database::Error;
use once_cell::sync::Lazy;
use crate::database::DatabaseExecutor;
use crate::value::Value;
/// The Column type
pub type Column = Value;
/// Expected response type for a given SQL statement
#[derive(Debug, Clone, Copy, Default)]
pub enum ExpectedSqlResponse {
/// A single row
SingleRow,
/// All the rows that matches a query
#[default]
ManyRows,
/// How many rows were affected by the query
AffectedRows,
/// Return the first column of the first row
Pluck,
/// Batch
Batch,
}
/// Part value
#[derive(Debug, Clone)]
pub enum PlaceholderValue {
/// Value
Value(Value),
/// Set
Set(Vec<Value>),
}
impl From<Value> for PlaceholderValue {
fn from(value: Value) -> Self {
PlaceholderValue::Value(value)
}
}
impl From<Vec<Value>> for PlaceholderValue {
fn from(value: Vec<Value>) -> Self {
PlaceholderValue::Set(value)
}
}
/// SQL Part
#[derive(Debug, Clone)]
pub enum SqlPart {
/// Raw SQL statement
Raw(Arc<str>),
/// Placeholder
Placeholder(Arc<str>, Option<PlaceholderValue>),
}
/// SQL parser error
#[derive(Debug, PartialEq, thiserror::Error)]
pub enum SqlParseError {
/// Invalid SQL
#[error("Unterminated String literal")]
UnterminatedStringLiteral,
/// Invalid placeholder name
#[error("Invalid placeholder name")]
InvalidPlaceholder,
}
/// Rudimentary SQL parser.
///
/// This function does not validate the SQL statement, it only extracts the placeholder to be
/// database agnostic.
pub fn split_sql_parts(input: &str) -> Result<Vec<SqlPart>, SqlParseError> {
let mut parts = Vec::new();
let mut current = String::new();
let mut chars = input.chars().peekable();
while let Some(&c) = chars.peek() {
match c {
'\'' | '"' => {
// Start of string literal
let quote = c;
current.push(chars.next().unwrap());
let mut closed = false;
while let Some(&next) = chars.peek() {
current.push(chars.next().unwrap());
if next == quote {
if chars.peek() == Some(&quote) {
// Escaped quote (e.g. '' inside strings)
current.push(chars.next().unwrap());
} else {
closed = true;
break;
}
}
}
if !closed {
return Err(SqlParseError::UnterminatedStringLiteral);
}
}
':' => {
// Flush current raw SQL
if !current.is_empty() {
parts.push(SqlPart::Raw(current.clone().into()));
current.clear();
}
chars.next(); // consume ':'
let mut name = String::new();
while let Some(&next) = chars.peek() {
if next.is_alphanumeric() || next == '_' {
name.push(chars.next().unwrap());
} else {
break;
}
}
if name.is_empty() {
return Err(SqlParseError::InvalidPlaceholder);
}
parts.push(SqlPart::Placeholder(name.into(), None));
}
_ => {
current.push(chars.next().unwrap());
}
}
}
if !current.is_empty() {
parts.push(SqlPart::Raw(current.into()));
}
Ok(parts)
}
type Cache = HashMap<String, (Vec<SqlPart>, Option<Arc<str>>)>;
/// Sql message
#[derive(Debug, Default)]
pub struct Statement {
cache: Arc<RwLock<Cache>>,
cached_sql: Option<Arc<str>>,
sql: Option<String>,
/// The SQL statement
pub parts: Vec<SqlPart>,
/// The expected response type
pub expected_response: ExpectedSqlResponse,
}
impl Statement {
/// Creates a new statement
fn new(sql: &str, cache: Arc<RwLock<Cache>>) -> Result<Self, SqlParseError> {
let parsed = cache
.read()
.map(|cache| cache.get(sql).cloned())
.ok()
.flatten();
if let Some((parts, cached_sql)) = parsed {
Ok(Self {
parts,
cached_sql,
sql: None,
cache,
..Default::default()
})
} else {
let parts = split_sql_parts(sql)?;
if let Ok(mut cache) = cache.write() {
cache.insert(sql.to_owned(), (parts.clone(), None));
} else {
tracing::warn!("Failed to acquire write lock for SQL statement cache");
}
Ok(Self {
parts,
sql: Some(sql.to_owned()),
cache,
..Default::default()
})
}
}
/// Convert Statement into a SQL statement and the list of placeholders
///
/// By default it converts the statement into placeholder using $1..$n placeholders which seems
/// to be more widely supported, although it can be reimplemented with other formats since part
/// is public
pub fn to_sql(self) -> Result<(String, Vec<Value>), Error> {
if let Some(cached_sql) = self.cached_sql {
let sql = cached_sql.to_string();
let values = self
.parts
.into_iter()
.map(|x| match x {
SqlPart::Placeholder(name, value) => {
match value.ok_or(Error::MissingPlaceholder(name.to_string()))? {
PlaceholderValue::Value(value) => Ok(vec![value]),
PlaceholderValue::Set(values) => Ok(values),
}
}
SqlPart::Raw(_) => Ok(vec![]),
})
.collect::<Result<Vec<_>, Error>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
return Ok((sql, values));
}
let mut placeholder_values = Vec::new();
let mut can_be_cached = true;
let sql = self
.parts
.into_iter()
.map(|x| match x {
SqlPart::Placeholder(name, value) => {
match value.ok_or(Error::MissingPlaceholder(name.to_string()))? {
PlaceholderValue::Value(value) => {
placeholder_values.push(value);
Ok::<_, Error>(format!("${}", placeholder_values.len()))
}
PlaceholderValue::Set(mut values) => {
can_be_cached = false;
let start_size = placeholder_values.len();
placeholder_values.append(&mut values);
let placeholders = (start_size + 1..=placeholder_values.len())
.map(|i| format!("${i}"))
.collect::<Vec<_>>()
.join(", ");
Ok(placeholders)
}
}
}
SqlPart::Raw(raw) => Ok(raw.trim().to_string()),
})
.collect::<Result<Vec<String>, _>>()?
.join(" ");
if can_be_cached {
if let Some(original_sql) = self.sql {
let _ = self.cache.write().map(|mut cache| {
if let Some((_, cached_sql)) = cache.get_mut(&original_sql) {
*cached_sql = Some(sql.clone().into());
}
});
}
}
Ok((sql, placeholder_values))
}
/// Binds a given placeholder to a value.
#[inline]
pub fn bind<C, V>(mut self, name: C, value: V) -> Self
where
C: ToString,
V: Into<Value>,
{
let name = name.to_string();
let value = value.into();
let value: PlaceholderValue = value.into();
for part in self.parts.iter_mut() {
if let SqlPart::Placeholder(part_name, part_value) = part {
if **part_name == *name.as_str() {
*part_value = Some(value.clone());
}
}
}
self
}
/// Binds a single variable with a vector.
///
/// This will rewrite the function from `:foo` (where value is vec![1, 2, 3]) to `:foo0, :foo1,
/// :foo2` and binds each value from the value vector accordingly.
#[inline]
pub fn bind_vec<C, V>(mut self, name: C, value: Vec<V>) -> Self
where
C: ToString,
V: Into<Value>,
{
let name = name.to_string();
let value: PlaceholderValue = value
.into_iter()
.map(|x| x.into())
.collect::<Vec<Value>>()
.into();
for part in self.parts.iter_mut() {
if let SqlPart::Placeholder(part_name, part_value) = part {
if **part_name == *name.as_str() {
*part_value = Some(value.clone());
}
}
}
self
}
/// Executes a query and returns the affected rows
pub async fn pluck<C>(self, conn: &C) -> Result<Option<Value>, Error>
where
C: DatabaseExecutor,
{
conn.pluck(self).await
}
/// Executes a query and returns the affected rows
pub async fn batch<C>(self, conn: &C) -> Result<(), Error>
where
C: DatabaseExecutor,
{
conn.batch(self).await
}
/// Executes a query and returns the affected rows
pub async fn execute<C>(self, conn: &C) -> Result<usize, Error>
where
C: DatabaseExecutor,
{
conn.execute(self).await
}
/// Runs the query and returns the first row or None
pub async fn fetch_one<C>(self, conn: &C) -> Result<Option<Vec<Column>>, Error>
where
C: DatabaseExecutor,
{
conn.fetch_one(self).await
}
/// Runs the query and returns the first row or None
pub async fn fetch_all<C>(self, conn: &C) -> Result<Vec<Vec<Column>>, Error>
where
C: DatabaseExecutor,
{
conn.fetch_all(self).await
}
}
/// Creates a new query statement
#[inline(always)]
pub fn query(sql: &str) -> Result<Statement, Error> {
static CACHE: Lazy<Arc<RwLock<Cache>>> = Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
Statement::new(sql, CACHE.clone()).map_err(|e| Error::Database(Box::new(e)))
}

View File

@@ -0,0 +1,82 @@
//! Generic Rust value representation for data from the database
/// Generic Value representation of data from the any database
#[derive(Clone, Debug, PartialEq)]
pub enum Value {
/// The value is a `NULL` value.
Null,
/// The value is a signed integer.
Integer(i64),
/// The value is a floating point number.
Real(f64),
/// The value is a text string.
Text(String),
/// The value is a blob of data
Blob(Vec<u8>),
}
impl From<String> for Value {
fn from(value: String) -> Self {
Self::Text(value)
}
}
impl From<&str> for Value {
fn from(value: &str) -> Self {
Self::Text(value.to_owned())
}
}
impl From<&&str> for Value {
fn from(value: &&str) -> Self {
Self::Text(value.to_string())
}
}
impl From<Vec<u8>> for Value {
fn from(value: Vec<u8>) -> Self {
Self::Blob(value)
}
}
impl From<&[u8]> for Value {
fn from(value: &[u8]) -> Self {
Self::Blob(value.to_owned())
}
}
impl From<u8> for Value {
fn from(value: u8) -> Self {
Self::Integer(value.into())
}
}
impl From<i64> for Value {
fn from(value: i64) -> Self {
Self::Integer(value)
}
}
impl From<u32> for Value {
fn from(value: u32) -> Self {
Self::Integer(value.into())
}
}
impl From<bool> for Value {
fn from(value: bool) -> Self {
Self::Integer(if value { 1 } else { 0 })
}
}
impl<T> From<Option<T>> for Value
where
T: Into<Value>,
{
fn from(value: Option<T>) -> Self {
match value {
Some(v) => v.into(),
None => Value::Null,
}
}
}

View File

@@ -2,7 +2,7 @@
use thiserror::Error;
/// SQLite Wallet Error
/// SQL Wallet Error
#[derive(Debug, Error)]
pub enum Error {
/// SQLX Error

View File

@@ -0,0 +1,21 @@
/// @generated
/// Auto-generated by build.rs
pub static MIGRATIONS: &[(&str, &str, &str)] = &[
("sqlite", "20240612132920_init.sql", include_str!(r#"./migrations/sqlite/20240612132920_init.sql"#)),
("sqlite", "20240618200350_quote_state.sql", include_str!(r#"./migrations/sqlite/20240618200350_quote_state.sql"#)),
("sqlite", "20240626091921_nut04_state.sql", include_str!(r#"./migrations/sqlite/20240626091921_nut04_state.sql"#)),
("sqlite", "20240710144711_input_fee.sql", include_str!(r#"./migrations/sqlite/20240710144711_input_fee.sql"#)),
("sqlite", "20240810214105_mint_icon_url.sql", include_str!(r#"./migrations/sqlite/20240810214105_mint_icon_url.sql"#)),
("sqlite", "20240810233905_update_mint_url.sql", include_str!(r#"./migrations/sqlite/20240810233905_update_mint_url.sql"#)),
("sqlite", "20240902151515_icon_url.sql", include_str!(r#"./migrations/sqlite/20240902151515_icon_url.sql"#)),
("sqlite", "20240902210905_mint_time.sql", include_str!(r#"./migrations/sqlite/20240902210905_mint_time.sql"#)),
("sqlite", "20241011125207_mint_urls.sql", include_str!(r#"./migrations/sqlite/20241011125207_mint_urls.sql"#)),
("sqlite", "20241108092756_wallet_mint_quote_secretkey.sql", include_str!(r#"./migrations/sqlite/20241108092756_wallet_mint_quote_secretkey.sql"#)),
("sqlite", "20250214135017_mint_tos.sql", include_str!(r#"./migrations/sqlite/20250214135017_mint_tos.sql"#)),
("sqlite", "20250310111513_drop_nostr_last_checked.sql", include_str!(r#"./migrations/sqlite/20250310111513_drop_nostr_last_checked.sql"#)),
("sqlite", "20250314082116_allow_pending_spent.sql", include_str!(r#"./migrations/sqlite/20250314082116_allow_pending_spent.sql"#)),
("sqlite", "20250323152040_wallet_dleq_proofs.sql", include_str!(r#"./migrations/sqlite/20250323152040_wallet_dleq_proofs.sql"#)),
("sqlite", "20250401120000_add_transactions_table.sql", include_str!(r#"./migrations/sqlite/20250401120000_add_transactions_table.sql"#)),
("sqlite", "20250616144830_add_keyset_expiry.sql", include_str!(r#"./migrations/sqlite/20250616144830_add_keyset_expiry.sql"#)),
("sqlite", "20250707093445_bolt12.sql", include_str!(r#"./migrations/sqlite/20250707093445_bolt12.sql"#)),
];

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,97 @@
PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE _sqlx_migrations (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
success BOOLEAN NOT NULL,
checksum BYTEA NOT NULL,
execution_time BIGINT NOT NULL
);
INSERT INTO _sqlx_migrations VALUES(20240612124932,'init','2025-06-13 20:01:04',1,X'42664ceda25b07bca420c2f7480c90334cb8a720203c1b4b8971181d5d3afabda3171aa89c1c0c8a26421eded94b77fa',921834);
INSERT INTO _sqlx_migrations VALUES(20240618195700,'quote state','2025-06-13 20:01:04',1,X'4b3a5a7f91032320f32b2c60a4348f0e80cef98fcf58153c4c942aa5124ddadce7c5c4338f29d2cb672fc4c08dd894a6',1019333);
INSERT INTO _sqlx_migrations VALUES(20240626092101,'nut04 state','2025-06-13 20:01:04',1,X'3641316faa018b13892d2972010b26a68d48b499aa67f8c084587265d070b575f541f165a9e2c5653b9c81a8dc198843',814000);
INSERT INTO _sqlx_migrations VALUES(20240703122347,'request lookup id','2025-06-13 20:01:04',1,X'234851aa0990048e119d07e9844f064ee71731c4e21021934e733359d6c50bc95a40051673f0a06e82d151c34fff6e8a',430875);
INSERT INTO _sqlx_migrations VALUES(20240710145043,'input fee','2025-06-13 20:01:04',1,X'422d4ce6a1d94c2df4a7fd9400c3d45db35953e53ba46025df7d3ed4d373e04f948468dcbcd8155829a5441f8b46d7f3',302916);
INSERT INTO _sqlx_migrations VALUES(20240711183109,'derivation path index','2025-06-13 20:01:04',1,X'83651c857135516fd578c5ee9f179a04964dc9a366a5b698c1cb54f2b5aa139dc912d34e28c5ff4cc157e6991032952f',225125);
INSERT INTO _sqlx_migrations VALUES(20240718203721,'allow unspent','2025-06-13 20:01:04',1,X'9b900846657b9083cdeca3da6ca7d74487c400f715f7d455c6a662de6b60e2761c3d80ea67d820e9b1ec9fbfd596e267',776167);
INSERT INTO _sqlx_migrations VALUES(20240811031111,'update mint url','2025-06-13 20:01:04',1,X'b8d771e08d3bbe3fc1e8beb1674714f0306d7f9f7cc09990fc0215850179a64366c8c46305ea0c1fb5dbc73a5fe48207',79334);
INSERT INTO _sqlx_migrations VALUES(20240919103407,'proofs quote id','2025-06-13 20:01:04',1,X'e3df13daebbc7df1907c68963258ad3722a0f2398f5ee1e92ea1824ce1a22f5657411f9c08a1f72bfd250e40630fdca5',387875);
INSERT INTO _sqlx_migrations VALUES(20240923153640,'melt requests','2025-06-13 20:01:04',1,X'8c35d740fbb1c0c13dc4594da50cce3e066cba2ff3926a5527629207678afe3a4fa3b7c8f5fab7e08525c676a4098154',188958);
INSERT INTO _sqlx_migrations VALUES(20240930101140,'dleq for sigs','2025-06-13 20:01:04',1,X'23c61a60db9bb145c238bb305583ccc025cd17958e61a6ff97ef0e4385517fe87729f77de0c26ce9cfa3a0c70b273038',383542);
INSERT INTO _sqlx_migrations VALUES(20241108093102,'mint mint quote pubkey','2025-06-13 20:01:04',1,X'00c83af91dc109368fcdc9a1360e1c893afcac3a649c7dfd04e841f1f8fe3d0e99a2ade6891ab752e1b942a738ac6b44',246875);
INSERT INTO _sqlx_migrations VALUES(20250103201327,'amount to pay msats','2025-06-13 20:01:04',1,X'4cc8bd34aec65365271e2dc2a19735403c8551dbf738b541659399c900fb167577d3f02b1988679e6c7922fe018b9a32',235041);
INSERT INTO _sqlx_migrations VALUES(20250129200912,'remove mint url','2025-06-13 20:01:04',1,X'f86b07a6b816683d72bdad637502a47cdeb21f6535aa8e2c0647d4b29f4f58931683b72062b3e313a5936264876bb2c3',638084);
INSERT INTO _sqlx_migrations VALUES(20250129230326,'add config table','2025-06-13 20:01:04',1,X'c232f4cfa032105cdd48097197d7fb0eea290a593af0996434c3f1f5396efb41d1f225592b292367fd9d584672a347d8',163625);
INSERT INTO _sqlx_migrations VALUES(20250307213652,'keyset id as foreign key','2025-06-13 20:01:04',1,X'50a36140780074b2730d429d664c2a7593f2c2237c1a36ed2a11e22c40bfa40b24dc3a5c8089959fae955fdbe2f06533',1498459);
INSERT INTO _sqlx_migrations VALUES(20250406091754,'mint time of quotes','2025-06-13 20:01:04',1,X'ac0165a8371cf7ad424be08c0e6931e1dd1249354ea0e33b4a04ff48ab4188da105e1fd763c42f06aeb733eb33d85415',934250);
INSERT INTO _sqlx_migrations VALUES(20250406093755,'mint created time signature','2025-06-13 20:01:04',1,X'7f2ff8e30f66ab142753cc2e0faec89560726d96298e9ce0c9e871974300fcbe7c2f8a9b2d48ed4ca8daf1b9a5043e95',447000);
INSERT INTO _sqlx_migrations VALUES(20250415093121,'drop keystore foreign','2025-06-13 20:01:04',1,X'efa99131d37335d64c86680c9e5b1362c2bf4d03fbdb6f60c9160edc572add6422d871f76a245d6f55f7fb6f4491b825',1375084);
CREATE TABLE keyset (
id TEXT PRIMARY KEY,
unit TEXT NOT NULL,
active BOOL NOT NULL,
valid_from INTEGER NOT NULL,
valid_to INTEGER,
derivation_path TEXT NOT NULL,
max_order INTEGER NOT NULL
, input_fee_ppk INTEGER, derivation_path_index INTEGER);
INSERT INTO keyset VALUES('0083a60439303340','sat',1,1749844864,NULL,'0''/0''/0''',32,0,0);
INSERT INTO keyset VALUES('00b13456b2934304','auth',1,1749844864,NULL,'0''/4''/0''',1,0,0);
INSERT INTO keyset VALUES('0002c733628bb92f','usd',1,1749844864,NULL,'0''/2''/0''',32,0,0);
CREATE TABLE mint_quote (
id TEXT PRIMARY KEY,
amount INTEGER NOT NULL,
unit TEXT NOT NULL,
request TEXT NOT NULL,
expiry INTEGER NOT NULL
, state TEXT CHECK ( state IN ('UNPAID', 'PENDING', 'PAID', 'ISSUED' ) ) NOT NULL DEFAULT 'UNPAID', request_lookup_id TEXT, pubkey TEXT, created_time INTEGER NOT NULL DEFAULT 0, paid_time INTEGER, issued_time INTEGER);
CREATE TABLE melt_quote (
id TEXT PRIMARY KEY,
unit TEXT NOT NULL,
amount INTEGER NOT NULL,
request TEXT NOT NULL,
fee_reserve INTEGER NOT NULL,
expiry INTEGER NOT NULL
, state TEXT CHECK ( state IN ('UNPAID', 'PENDING', 'PAID' ) ) NOT NULL DEFAULT 'UNPAID', payment_preimage TEXT, request_lookup_id TEXT, msat_to_pay INTEGER, created_time INTEGER NOT NULL DEFAULT 0, paid_time INTEGER);
CREATE TABLE melt_request (
id TEXT PRIMARY KEY,
inputs TEXT NOT NULL,
outputs TEXT,
method TEXT NOT NULL,
unit TEXT NOT NULL
);
CREATE TABLE config (
id TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS "proof" (
y BYTEA PRIMARY KEY,
amount INTEGER NOT NULL,
keyset_id TEXT NOT NULL, -- no FK constraint here
secret TEXT NOT NULL,
c BYTEA NOT NULL,
witness TEXT,
state TEXT CHECK (state IN ('SPENT', 'PENDING', 'UNSPENT', 'RESERVED', 'UNKNOWN')) NOT NULL,
quote_id TEXT,
created_time INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS "blind_signature" (
y BYTEA PRIMARY KEY,
amount INTEGER NOT NULL,
keyset_id TEXT NOT NULL, -- FK removed
c BYTEA NOT NULL,
dleq_e TEXT,
dleq_s TEXT,
quote_id TEXT,
created_time INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX unit_index ON keyset(unit);
CREATE INDEX active_index ON keyset(active);
CREATE INDEX request_index ON mint_quote(request);
CREATE INDEX expiry_index ON mint_quote(expiry);
CREATE INDEX melt_quote_state_index ON melt_quote(state);
CREATE INDEX mint_quote_state_index ON mint_quote(state);
CREATE UNIQUE INDEX unique_request_lookup_id_mint ON mint_quote(request_lookup_id);
CREATE UNIQUE INDEX unique_request_lookup_id_melt ON melt_quote(request_lookup_id);
COMMIT;

View File

@@ -13,18 +13,19 @@ readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["mint", "wallet", "auth"]
mint = ["cdk-common/mint"]
wallet = ["cdk-common/wallet"]
auth = ["cdk-common/auth"]
mint = ["cdk-common/mint", "cdk-sql-common/mint"]
wallet = ["cdk-common/wallet", "cdk-sql-common/wallet"]
auth = ["cdk-common/auth", "cdk-sql-common/auth"]
sqlcipher = ["rusqlite/bundled-sqlcipher"]
[dependencies]
async-trait.workspace = true
cdk-common = { workspace = true, features = ["test"] }
bitcoin.workspace = true
cdk-sql-common = { workspace = true }
rusqlite = { version = "0.31", features = ["bundled"]}
thiserror.workspace = true
tokio.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"]}
tracing.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -1,12 +1,13 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use rusqlite::{params, Connection};
use crate::pool::{Pool, ResourceManager};
use cdk_sql_common::pool::{self, Pool, ResourceManager};
use cdk_sql_common::value::Value;
use rusqlite::Connection;
/// The config need to create a new SQLite connection
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Config {
path: Option<String>,
password: Option<String>,
@@ -25,7 +26,9 @@ impl ResourceManager for SqliteConnectionManager {
fn new_resource(
config: &Self::Config,
) -> Result<Self::Resource, crate::pool::Error<Self::Error>> {
_stale: Arc<AtomicBool>,
_timeout: Duration,
) -> Result<Self::Resource, pool::Error<Self::Error>> {
let conn = if let Some(path) = config.path.as_ref() {
Connection::open(path)?
} else {
@@ -57,14 +60,8 @@ impl ResourceManager for SqliteConnectionManager {
/// For SQLCipher support, enable the "sqlcipher" feature and pass a password.
pub fn create_sqlite_pool(
path: &str,
#[cfg(feature = "sqlcipher")] password: String,
password: Option<String>,
) -> Arc<Pool<SqliteConnectionManager>> {
#[cfg(feature = "sqlcipher")]
let password = Some(password);
#[cfg(not(feature = "sqlcipher"))]
let password = None;
let (config, max_size) = if path.contains(":memory:") {
(
Config {
@@ -86,52 +83,26 @@ pub fn create_sqlite_pool(
Pool::new(config, max_size, Duration::from_secs(10))
}
/// Migrates the migration generated by `build.rs`
pub fn migrate(conn: &mut Connection, migrations: &[(&str, &str)]) -> Result<(), rusqlite::Error> {
let tx = conn.transaction()?;
tx.execute(
r#"
CREATE TABLE IF NOT EXISTS migrations (
name TEXT PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"#,
[],
)?;
if tx.query_row(
r#"select count(*) from sqlite_master where name = '_sqlx_migrations'"#,
[],
|row| row.get::<_, i32>(0),
)? == 1
{
tx.execute_batch(
r#"
INSERT INTO migrations
SELECT
version || '_' || REPLACE(description, ' ', '_') || '.sql',
execution_time
FROM _sqlx_migrations;
DROP TABLE _sqlx_migrations;
"#,
)?;
/// Convert cdk_sql_common::value::Value to rusqlite Value
#[inline(always)]
pub fn to_sqlite(v: Value) -> rusqlite::types::Value {
match v {
Value::Blob(blob) => rusqlite::types::Value::Blob(blob),
Value::Integer(i) => rusqlite::types::Value::Integer(i),
Value::Null => rusqlite::types::Value::Null,
Value::Text(t) => rusqlite::types::Value::Text(t),
Value::Real(r) => rusqlite::types::Value::Real(r),
}
}
/// Convert from rusqlite Valute to cdk_sql_common::value::Value
#[inline(always)]
pub fn from_sqlite(v: rusqlite::types::Value) -> Value {
match v {
rusqlite::types::Value::Blob(blob) => Value::Blob(blob),
rusqlite::types::Value::Integer(i) => Value::Integer(i),
rusqlite::types::Value::Null => Value::Null,
rusqlite::types::Value::Text(t) => Value::Text(t),
rusqlite::types::Value::Real(r) => Value::Real(r),
}
// Apply each migration if it hasnt been applied yet
for (name, sql) in migrations {
let already_applied: bool = tx.query_row(
"SELECT EXISTS(SELECT 1 FROM migrations WHERE name = ?1)",
params![name],
|row| row.get(0),
)?;
if !already_applied {
tx.execute_batch(sql)?;
tx.execute("INSERT INTO migrations (name) VALUES (?1)", params![name])?;
}
}
tx.commit()?;
Ok(())
}

View File

@@ -4,9 +4,6 @@
#![warn(rustdoc::bare_urls)]
mod common;
mod macros;
mod pool;
mod stmt;
#[cfg(feature = "mint")]
pub mod mint;

View File

@@ -1,16 +1,20 @@
//! Async, pipelined rusqlite client
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc as std_mpsc, Arc, Mutex};
use std::thread::spawn;
use std::time::Instant;
use cdk_common::database::Error;
use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
use cdk_sql_common::pool::{self, Pool, PooledResource};
use cdk_sql_common::stmt::{Column, ExpectedSqlResponse, Statement as InnerStatement};
use cdk_sql_common::ConversionError;
use rusqlite::{ffi, Connection, ErrorCode, TransactionBehavior};
use tokio::sync::{mpsc, oneshot};
use crate::common::SqliteConnectionManager;
use crate::mint::Error;
use crate::pool::{Pool, PooledResource};
use crate::stmt::{Column, ExpectedSqlResponse, Statement as InnerStatement, Value};
use crate::common::{create_sqlite_pool, from_sqlite, to_sqlite, SqliteConnectionManager};
/// The number of queued SQL statements before it start failing
const SQL_QUEUE_SIZE: usize = 10_000;
@@ -25,9 +29,57 @@ pub struct AsyncRusqlite {
inflight_requests: Arc<AtomicUsize>,
}
impl From<PathBuf> for AsyncRusqlite {
fn from(value: PathBuf) -> Self {
AsyncRusqlite::new(create_sqlite_pool(value.to_str().unwrap_or_default(), None))
}
}
impl From<&str> for AsyncRusqlite {
fn from(value: &str) -> Self {
AsyncRusqlite::new(create_sqlite_pool(value, None))
}
}
impl From<(&str, &str)> for AsyncRusqlite {
fn from((value, pass): (&str, &str)) -> Self {
AsyncRusqlite::new(create_sqlite_pool(value, Some(pass.to_owned())))
}
}
impl From<(PathBuf, &str)> for AsyncRusqlite {
fn from((value, pass): (PathBuf, &str)) -> Self {
AsyncRusqlite::new(create_sqlite_pool(
value.to_str().unwrap_or_default(),
Some(pass.to_owned()),
))
}
}
impl From<(&str, String)> for AsyncRusqlite {
fn from((value, pass): (&str, String)) -> Self {
AsyncRusqlite::new(create_sqlite_pool(value, Some(pass)))
}
}
impl From<(PathBuf, String)> for AsyncRusqlite {
fn from((value, pass): (PathBuf, String)) -> Self {
AsyncRusqlite::new(create_sqlite_pool(
value.to_str().unwrap_or_default(),
Some(pass),
))
}
}
impl From<&PathBuf> for AsyncRusqlite {
fn from(value: &PathBuf) -> Self {
AsyncRusqlite::new(create_sqlite_pool(value.to_str().unwrap_or_default(), None))
}
}
/// Internal request for the database thread
#[derive(Debug)]
pub enum DbRequest {
enum DbRequest {
Sql(InnerStatement, oneshot::Sender<DbResponse>),
Begin(oneshot::Sender<DbResponse>),
Commit(oneshot::Sender<DbResponse>),
@@ -35,97 +87,67 @@ pub enum DbRequest {
}
#[derive(Debug)]
pub enum DbResponse {
enum DbResponse {
Transaction(mpsc::Sender<DbRequest>),
AffectedRows(usize),
Pluck(Option<Column>),
Row(Option<Vec<Column>>),
Rows(Vec<Vec<Column>>),
Error(Error),
Error(SqliteError),
Unexpected,
Ok,
}
/// Statement for the async_rusqlite wrapper
pub struct Statement(InnerStatement);
#[derive(thiserror::Error, Debug)]
enum SqliteError {
#[error(transparent)]
Sqlite(#[from] rusqlite::Error),
impl Statement {
/// Bind a variable
pub fn bind<C, V>(self, name: C, value: V) -> Self
where
C: ToString,
V: Into<Value>,
{
Self(self.0.bind(name, value))
}
#[error(transparent)]
Inner(#[from] Error),
/// Bind vec
pub fn bind_vec<C, V>(self, name: C, value: Vec<V>) -> Self
where
C: ToString,
V: Into<Value>,
{
Self(self.0.bind_vec(name, value))
}
#[error(transparent)]
Pool(#[from] pool::Error<rusqlite::Error>),
/// Executes a query and return the number of affected rows
pub async fn execute<C>(self, conn: &C) -> Result<usize, Error>
where
C: DatabaseExecutor + Send + Sync,
{
conn.execute(self.0).await
}
/// Duplicate entry
#[error("Duplicate")]
Duplicate,
/// Returns the first column of the first row of the query result
pub async fn pluck<C>(self, conn: &C) -> Result<Option<Column>, Error>
where
C: DatabaseExecutor + Send + Sync,
{
conn.pluck(self.0).await
}
#[error(transparent)]
Conversion(#[from] ConversionError),
}
/// Returns the first row of the query result
pub async fn fetch_one<C>(self, conn: &C) -> Result<Option<Vec<Column>>, Error>
where
C: DatabaseExecutor + Send + Sync,
{
conn.fetch_one(self.0).await
}
/// Returns all rows of the query result
pub async fn fetch_all<C>(self, conn: &C) -> Result<Vec<Vec<Column>>, Error>
where
C: DatabaseExecutor + Send + Sync,
{
conn.fetch_all(self.0).await
impl From<SqliteError> for Error {
fn from(val: SqliteError) -> Self {
match val {
SqliteError::Duplicate => Error::Duplicate,
SqliteError::Conversion(e) => e.into(),
o => Error::Internal(o.to_string()),
}
}
}
/// Process a query
#[inline(always)]
fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, Error> {
fn process_query(conn: &Connection, statement: InnerStatement) -> Result<DbResponse, SqliteError> {
let start = Instant::now();
let mut args = sql.args;
let mut stmt = conn.prepare_cached(&sql.sql)?;
let total_parameters = stmt.parameter_count();
let expected_response = statement.expected_response;
let (sql, placeholder_values) = statement.to_sql()?;
let sql = sql.trim_end_matches("FOR UPDATE");
for index in 1..=total_parameters {
let value = if let Some(value) = stmt.parameter_name(index).map(|name| {
args.remove(name)
.ok_or(Error::MissingParameter(name.to_owned()))
}) {
value?
} else {
continue;
};
stmt.raw_bind_parameter(index, value)?;
let mut stmt = conn.prepare_cached(sql)?;
for (i, value) in placeholder_values.into_iter().enumerate() {
stmt.raw_bind_parameter(i + 1, to_sqlite(value))?;
}
let columns = stmt.column_count();
let to_return = match sql.expected_response {
let to_return = match expected_response {
ExpectedSqlResponse::AffectedRows => DbResponse::AffectedRows(stmt.raw_execute()?),
ExpectedSqlResponse::Batch => {
conn.execute_batch(sql)?;
DbResponse::Ok
}
ExpectedSqlResponse::ManyRows => {
let mut rows = stmt.raw_query();
let mut results = vec![];
@@ -133,7 +155,7 @@ fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, E
while let Some(row) = rows.next()? {
results.push(
(0..columns)
.map(|i| row.get(i))
.map(|i| row.get(i).map(from_sqlite))
.collect::<Result<Vec<_>, _>>()?,
)
}
@@ -142,7 +164,11 @@ fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, E
}
ExpectedSqlResponse::Pluck => {
let mut rows = stmt.raw_query();
DbResponse::Pluck(rows.next()?.map(|row| row.get(0usize)).transpose()?)
DbResponse::Pluck(
rows.next()?
.map(|row| row.get(0usize).map(from_sqlite))
.transpose()?,
)
}
ExpectedSqlResponse::SingleRow => {
let mut rows = stmt.raw_query();
@@ -150,7 +176,7 @@ fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, E
.next()?
.map(|row| {
(0..columns)
.map(|i| row.get(i))
.map(|i| row.get(i).map(from_sqlite))
.collect::<Result<Vec<_>, _>>()
})
.transpose()?;
@@ -161,7 +187,7 @@ fn process_query(conn: &Connection, sql: InnerStatement) -> Result<DbResponse, E
let duration = start.elapsed();
if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), sql.sql);
tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), sql);
}
Ok(to_return)
@@ -196,13 +222,12 @@ fn rusqlite_spawn_worker_threads(
let inflight_requests = inflight_requests.clone();
spawn(move || loop {
while let Ok((conn, sql, reply_to)) = rx.lock().expect("failed to acquire").recv() {
tracing::trace!("Execute query: {}", sql.sql);
let result = process_query(&conn, sql);
let _ = match result {
Ok(ok) => reply_to.send(ok),
Err(err) => {
tracing::error!("Failed query with error {:?}", err);
let err = if let Error::Sqlite(rusqlite::Error::SqliteFailure(
let err = if let SqliteError::Sqlite(rusqlite::Error::SqliteFailure(
ffi::Error {
code,
extended_code,
@@ -214,7 +239,7 @@ fn rusqlite_spawn_worker_threads(
&& (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
|| *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
{
Error::Duplicate
SqliteError::Duplicate
} else {
err
}
@@ -256,7 +281,7 @@ fn rusqlite_worker_manager(
while let Some(request) = receiver.blocking_recv() {
inflight_requests.fetch_add(1, Ordering::Relaxed);
match request {
DbRequest::Sql(sql, reply_to) => {
DbRequest::Sql(statement, reply_to) => {
let conn = match pool.get() {
Ok(conn) => conn,
Err(err) => {
@@ -267,7 +292,7 @@ fn rusqlite_worker_manager(
}
};
let _ = send_sql_to_thread.send((conn, sql, reply_to));
let _ = send_sql_to_thread.send((conn, statement, reply_to));
continue;
}
DbRequest::Begin(reply_to) => {
@@ -341,9 +366,9 @@ fn rusqlite_worker_manager(
DbRequest::Begin(reply_to) => {
let _ = reply_to.send(DbResponse::Unexpected);
}
DbRequest::Sql(sql, reply_to) => {
tracing::trace!("Tx {}: SQL {}", tx_id, sql.sql);
let _ = match process_query(&tx, sql) {
DbRequest::Sql(statement, reply_to) => {
tracing::trace!("Tx {}: SQL {:?}", tx_id, statement);
let _ = match process_query(&tx, statement) {
Ok(ok) => reply_to.send(ok),
Err(err) => {
tracing::error!(
@@ -351,7 +376,7 @@ fn rusqlite_worker_manager(
tx_id,
err
);
let err = if let Error::Sqlite(
let err = if let SqliteError::Sqlite(
rusqlite::Error::SqliteFailure(
ffi::Error {
code,
@@ -365,7 +390,7 @@ fn rusqlite_worker_manager(
&& (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
|| *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
{
Error::Duplicate
SqliteError::Duplicate
} else {
err
}
@@ -395,83 +420,6 @@ fn rusqlite_worker_manager(
}
}
#[async_trait::async_trait]
pub trait DatabaseExecutor {
/// Returns the connection to the database thread (or the on-going transaction)
fn get_queue_sender(&self) -> mpsc::Sender<DbRequest>;
/// Executes a query and returns the affected rows
async fn execute(&self, mut statement: InnerStatement) -> Result<usize, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::AffectedRows;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Communication)?;
match receiver.await.map_err(|_| Error::Communication)? {
DbResponse::AffectedRows(n) => Ok(n),
DbResponse::Error(err) => Err(err),
_ => Err(Error::InvalidDbResponse),
}
}
/// Runs the query and returns the first row or None
async fn fetch_one(&self, mut statement: InnerStatement) -> Result<Option<Vec<Column>>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::SingleRow;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Communication)?;
match receiver.await.map_err(|_| Error::Communication)? {
DbResponse::Row(row) => Ok(row),
DbResponse::Error(err) => Err(err),
_ => Err(Error::InvalidDbResponse),
}
}
/// Runs the query and returns the first row or None
async fn fetch_all(&self, mut statement: InnerStatement) -> Result<Vec<Vec<Column>>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::ManyRows;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Communication)?;
match receiver.await.map_err(|_| Error::Communication)? {
DbResponse::Rows(rows) => Ok(rows),
DbResponse::Error(err) => Err(err),
_ => Err(Error::InvalidDbResponse),
}
}
async fn pluck(&self, mut statement: InnerStatement) -> Result<Option<Column>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::Pluck;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Communication)?;
match receiver.await.map_err(|_| Error::Communication)? {
DbResponse::Pluck(value) => Ok(value),
DbResponse::Error(err) => Err(err),
_ => Err(Error::InvalidDbResponse),
}
}
}
#[inline(always)]
pub fn query<T>(sql: T) -> Statement
where
T: ToString,
{
Statement(crate::stmt::Statement::new(sql))
}
impl AsyncRusqlite {
/// Creates a new Async Rusqlite wrapper.
pub fn new(pool: Arc<Pool<SqliteConnectionManager>>) -> Self {
@@ -488,45 +436,155 @@ impl AsyncRusqlite {
}
}
fn get_queue_sender(&self) -> &mpsc::Sender<DbRequest> {
&self.sender
}
/// Show how many inflight requests
#[allow(dead_code)]
pub fn inflight_requests(&self) -> usize {
self.inflight_requests.load(Ordering::Relaxed)
}
}
#[async_trait::async_trait]
impl DatabaseConnector for AsyncRusqlite {
type Transaction<'a> = Transaction<'a>;
/// Begins a transaction
///
/// If the transaction is Drop it will trigger a rollback operation
pub async fn begin(&self) -> Result<Transaction<'_>, Error> {
async fn begin(&self) -> Result<Self::Transaction<'_>, Error> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(DbRequest::Begin(sender))
.await
.map_err(|_| Error::Communication)?;
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver.await.map_err(|_| Error::Communication)? {
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Transaction(db_sender) => Ok(Transaction {
db_sender,
_marker: PhantomData,
}),
DbResponse::Error(err) => Err(err),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
}
#[async_trait::async_trait]
impl DatabaseExecutor for AsyncRusqlite {
#[inline(always)]
fn get_queue_sender(&self) -> mpsc::Sender<DbRequest> {
self.sender.clone()
fn name() -> &'static str {
"sqlite"
}
async fn fetch_one(&self, mut statement: InnerStatement) -> Result<Option<Vec<Column>>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::SingleRow;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Row(row) => Ok(row),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn batch(&self, mut statement: InnerStatement) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::Batch;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Ok => Ok(()),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn fetch_all(&self, mut statement: InnerStatement) -> Result<Vec<Vec<Column>>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::ManyRows;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Rows(row) => Ok(row),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn execute(&self, mut statement: InnerStatement) -> Result<usize, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::AffectedRows;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::AffectedRows(total) => Ok(total),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn pluck(&self, mut statement: InnerStatement) -> Result<Option<Column>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::Pluck;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Pluck(value) => Ok(value),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
}
/// Database transaction
#[derive(Debug)]
pub struct Transaction<'conn> {
db_sender: mpsc::Sender<DbRequest>,
_marker: PhantomData<&'conn ()>,
}
impl Transaction<'_> {
fn get_queue_sender(&self) -> &mpsc::Sender<DbRequest> {
&self.db_sender
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
let (sender, _) = oneshot::channel();
@@ -534,40 +592,136 @@ impl Drop for Transaction<'_> {
}
}
impl Transaction<'_> {
pub async fn commit(self) -> Result<(), Error> {
#[async_trait::async_trait]
impl<'a> DatabaseTransaction<'a> for Transaction<'a> {
async fn commit(self) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
self.db_sender
.send(DbRequest::Commit(sender))
.await
.map_err(|_| Error::Communication)?;
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver.await.map_err(|_| Error::Communication)? {
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Ok => Ok(()),
DbResponse::Error(err) => Err(err),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
pub async fn rollback(self) -> Result<(), Error> {
async fn rollback(self) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
self.db_sender
.send(DbRequest::Rollback(sender))
.await
.map_err(|_| Error::Communication)?;
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver.await.map_err(|_| Error::Communication)? {
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Ok => Ok(()),
DbResponse::Error(err) => Err(err),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
}
#[async_trait::async_trait]
impl DatabaseExecutor for Transaction<'_> {
/// Get the internal sender to the SQL queue
#[inline(always)]
fn get_queue_sender(&self) -> mpsc::Sender<DbRequest> {
self.db_sender.clone()
fn name() -> &'static str {
"sqlite"
}
async fn fetch_one(&self, mut statement: InnerStatement) -> Result<Option<Vec<Column>>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::SingleRow;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Row(row) => Ok(row),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn batch(&self, mut statement: InnerStatement) -> Result<(), Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::Batch;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Ok => Ok(()),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn fetch_all(&self, mut statement: InnerStatement) -> Result<Vec<Vec<Column>>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::ManyRows;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Rows(row) => Ok(row),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn execute(&self, mut statement: InnerStatement) -> Result<usize, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::AffectedRows;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::AffectedRows(total) => Ok(total),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
async fn pluck(&self, mut statement: InnerStatement) -> Result<Option<Column>, Error> {
let (sender, receiver) = oneshot::channel();
statement.expected_response = ExpectedSqlResponse::Pluck;
self.get_queue_sender()
.send(DbRequest::Sql(statement, sender))
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?;
match receiver
.await
.map_err(|_| Error::Internal("Communication".to_owned()))?
{
DbResponse::Pluck(value) => Ok(value),
DbResponse::Error(err) => Err(err.into()),
_ => Err(Error::InvalidDbResponse),
}
}
}

View File

@@ -1,5 +0,0 @@
// @generated
// Auto-generated by build.rs
pub static MIGRATIONS: &[(&str, &str)] = &[
("20250109143347_init.sql", include_str!(r#"./migrations/20250109143347_init.sql"#)),
];

View File

@@ -1,116 +0,0 @@
//! SQLite Database Error
use thiserror::Error;
/// SQLite Database Error
#[derive(Debug, Error)]
pub enum Error {
/// SQLX Error
#[error(transparent)]
Sqlite(#[from] rusqlite::Error),
/// Duplicate entry
#[error("Record already exists")]
Duplicate,
/// Pool error
#[error(transparent)]
Pool(#[from] crate::pool::Error<rusqlite::Error>),
/// Invalid UUID
#[error("Invalid UUID: {0}")]
InvalidUuid(String),
/// QuoteNotFound
#[error("Quote not found")]
QuoteNotFound,
/// Missing named parameter
#[error("Missing named parameter {0}")]
MissingParameter(String),
/// Communication error with the database
#[error("Internal communication error")]
Communication,
/// Invalid response from the database thread
#[error("Unexpected database response")]
InvalidDbResponse,
/// Invalid db type
#[error("Invalid type from db, expected {0} got {1}")]
InvalidType(String, String),
/// Missing columns
#[error("Not enough elements: expected {0}, got {1}")]
MissingColumn(usize, usize),
/// Invalid data conversion in column
#[error("Error converting {0} to {1}")]
InvalidConversion(String, String),
/// NUT00 Error
#[error(transparent)]
CDKNUT00(#[from] cdk_common::nuts::nut00::Error),
/// NUT01 Error
#[error(transparent)]
CDKNUT01(#[from] cdk_common::nuts::nut01::Error),
/// NUT02 Error
#[error(transparent)]
CDKNUT02(#[from] cdk_common::nuts::nut02::Error),
/// NUT04 Error
#[error(transparent)]
CDKNUT04(#[from] cdk_common::nuts::nut04::Error),
/// NUT05 Error
#[error(transparent)]
CDKNUT05(#[from] cdk_common::nuts::nut05::Error),
/// NUT07 Error
#[error(transparent)]
CDKNUT07(#[from] cdk_common::nuts::nut07::Error),
/// NUT23 Error
#[error(transparent)]
CDKNUT23(#[from] cdk_common::nuts::nut23::Error),
/// Secret Error
#[error(transparent)]
CDKSECRET(#[from] cdk_common::secret::Error),
/// BIP32 Error
#[error(transparent)]
BIP32(#[from] bitcoin::bip32::Error),
/// Mint Url Error
#[error(transparent)]
MintUrl(#[from] cdk_common::mint_url::Error),
/// Could Not Initialize Database
#[error("Could not initialize database")]
CouldNotInitialize,
/// Invalid Database Path
#[error("Invalid database path")]
InvalidDbPath,
/// Serde Error
#[error(transparent)]
Serde(#[from] serde_json::Error),
/// Unknown Mint Info
#[error("Unknown mint info")]
UnknownMintInfo,
/// Unknown quote TTL
#[error("Unknown quote TTL")]
UnknownQuoteTTL,
/// Unknown config key
#[error("Unknown config key: {0}")]
UnknownConfigKey(String),
/// Proof not found
#[error("Proof not found")]
ProofNotFound,
/// Invalid keyset ID
#[error("Invalid keyset ID")]
InvalidKeysetId,
/// Invalid melt payment request
#[error("Invalid melt payment request")]
InvalidMeltPaymentRequest,
}
impl From<Error> for cdk_common::database::Error {
fn from(e: Error) -> Self {
match e {
Error::Duplicate => Self::Duplicate,
e => Self::Database(Box::new(e)),
}
}
}

View File

@@ -11,10 +11,11 @@ use super::MintSqliteDatabase;
/// Creates a new in-memory [`MintSqliteDatabase`] instance
pub async fn empty() -> Result<MintSqliteDatabase, database::Error> {
#[cfg(not(feature = "sqlcipher"))]
let db = MintSqliteDatabase::new(":memory:").await?;
let path = ":memory:";
#[cfg(feature = "sqlcipher")]
let db = MintSqliteDatabase::new(":memory:", "memory".to_string()).await?;
Ok(db)
let path = (":memory:", "memory");
MintSqliteDatabase::new(path).await
}
/// Creates a new in-memory [`MintSqliteDatabase`] instance with the given state

View File

@@ -1,25 +0,0 @@
// @generated
// Auto-generated by build.rs
pub static MIGRATIONS: &[(&str, &str)] = &[
("20240612124932_init.sql", include_str!(r#"./migrations/20240612124932_init.sql"#)),
("20240618195700_quote_state.sql", include_str!(r#"./migrations/20240618195700_quote_state.sql"#)),
("20240626092101_nut04_state.sql", include_str!(r#"./migrations/20240626092101_nut04_state.sql"#)),
("20240703122347_request_lookup_id.sql", include_str!(r#"./migrations/20240703122347_request_lookup_id.sql"#)),
("20240710145043_input_fee.sql", include_str!(r#"./migrations/20240710145043_input_fee.sql"#)),
("20240711183109_derivation_path_index.sql", include_str!(r#"./migrations/20240711183109_derivation_path_index.sql"#)),
("20240718203721_allow_unspent.sql", include_str!(r#"./migrations/20240718203721_allow_unspent.sql"#)),
("20240811031111_update_mint_url.sql", include_str!(r#"./migrations/20240811031111_update_mint_url.sql"#)),
("20240919103407_proofs_quote_id.sql", include_str!(r#"./migrations/20240919103407_proofs_quote_id.sql"#)),
("20240923153640_melt_requests.sql", include_str!(r#"./migrations/20240923153640_melt_requests.sql"#)),
("20240930101140_dleq_for_sigs.sql", include_str!(r#"./migrations/20240930101140_dleq_for_sigs.sql"#)),
("20241108093102_mint_mint_quote_pubkey.sql", include_str!(r#"./migrations/20241108093102_mint_mint_quote_pubkey.sql"#)),
("20250103201327_amount_to_pay_msats.sql", include_str!(r#"./migrations/20250103201327_amount_to_pay_msats.sql"#)),
("20250129200912_remove_mint_url.sql", include_str!(r#"./migrations/20250129200912_remove_mint_url.sql"#)),
("20250129230326_add_config_table.sql", include_str!(r#"./migrations/20250129230326_add_config_table.sql"#)),
("20250307213652_keyset_id_as_foreign_key.sql", include_str!(r#"./migrations/20250307213652_keyset_id_as_foreign_key.sql"#)),
("20250406091754_mint_time_of_quotes.sql", include_str!(r#"./migrations/20250406091754_mint_time_of_quotes.sql"#)),
("20250406093755_mint_created_time_signature.sql", include_str!(r#"./migrations/20250406093755_mint_created_time_signature.sql"#)),
("20250415093121_drop_keystore_foreign.sql", include_str!(r#"./migrations/20250415093121_drop_keystore_foreign.sql"#)),
("20250626120251_rename_blind_message_y_to_b.sql", include_str!(r#"./migrations/20250626120251_rename_blind_message_y_to_b.sql"#)),
("20250706101057_bolt12.sql", include_str!(r#"./migrations/20250706101057_bolt12.sql"#)),
];

File diff suppressed because it is too large Load Diff

View File

@@ -1,184 +0,0 @@
use std::collections::HashMap;
use rusqlite::{self, CachedStatement};
use crate::common::SqliteConnectionManager;
use crate::pool::PooledResource;
/// The Value coming from SQLite
pub type Value = rusqlite::types::Value;
/// The Column type
pub type Column = Value;
/// Expected response type for a given SQL statement
#[derive(Debug, Clone, Copy, Default)]
pub enum ExpectedSqlResponse {
/// A single row
SingleRow,
/// All the rows that matches a query
#[default]
ManyRows,
/// How many rows were affected by the query
AffectedRows,
/// Return the first column of the first row
Pluck,
}
/// Sql message
#[derive(Default, Debug)]
pub struct Statement {
/// The SQL statement
pub sql: String,
/// The list of arguments for the placeholders. It only supports named arguments for simplicity
/// sake
pub args: HashMap<String, Value>,
/// The expected response type
pub expected_response: ExpectedSqlResponse,
}
impl Statement {
/// Creates a new statement
pub fn new<T>(sql: T) -> Self
where
T: ToString,
{
Self {
sql: sql.to_string(),
..Default::default()
}
}
/// Binds a given placeholder to a value.
#[inline]
pub fn bind<C, V>(mut self, name: C, value: V) -> Self
where
C: ToString,
V: Into<Value>,
{
self.args.insert(name.to_string(), value.into());
self
}
/// Binds a single variable with a vector.
///
/// This will rewrite the function from `:foo` (where value is vec![1, 2, 3]) to `:foo0, :foo1,
/// :foo2` and binds each value from the value vector accordingly.
#[inline]
pub fn bind_vec<C, V>(mut self, name: C, value: Vec<V>) -> Self
where
C: ToString,
V: Into<Value>,
{
let mut new_sql = String::with_capacity(self.sql.len());
let target = name.to_string();
let mut i = 0;
let placeholders = value
.into_iter()
.enumerate()
.map(|(key, value)| {
let key = format!("{target}{key}");
self.args.insert(key.clone(), value.into());
key
})
.collect::<Vec<_>>()
.join(",");
while let Some(pos) = self.sql[i..].find(&target) {
let abs_pos = i + pos;
let after = abs_pos + target.len();
let is_word_boundary = self.sql[after..]
.chars()
.next()
.map_or(true, |c| !c.is_alphanumeric() && c != '_');
if is_word_boundary {
new_sql.push_str(&self.sql[i..abs_pos]);
new_sql.push_str(&placeholders);
i = after;
} else {
new_sql.push_str(&self.sql[i..=abs_pos]);
i = abs_pos + 1;
}
}
new_sql.push_str(&self.sql[i..]);
self.sql = new_sql;
self
}
fn get_stmt(
self,
conn: &PooledResource<SqliteConnectionManager>,
) -> rusqlite::Result<CachedStatement<'_>> {
let mut stmt = conn.prepare_cached(&self.sql)?;
for (name, value) in self.args {
let index = stmt
.parameter_index(&name)
.map_err(|_| rusqlite::Error::InvalidColumnName(name.clone()))?
.ok_or(rusqlite::Error::InvalidColumnName(name))?;
stmt.raw_bind_parameter(index, value)?;
}
Ok(stmt)
}
/// Executes a query and returns the affected rows
pub fn plunk(
self,
conn: &PooledResource<SqliteConnectionManager>,
) -> rusqlite::Result<Option<Value>> {
let mut stmt = self.get_stmt(conn)?;
let mut rows = stmt.raw_query();
rows.next()?.map(|row| row.get(0)).transpose()
}
/// Executes a query and returns the affected rows
pub fn execute(
self,
conn: &PooledResource<SqliteConnectionManager>,
) -> rusqlite::Result<usize> {
self.get_stmt(conn)?.raw_execute()
}
/// Runs the query and returns the first row or None
pub fn fetch_one(
self,
conn: &PooledResource<SqliteConnectionManager>,
) -> rusqlite::Result<Option<Vec<Column>>> {
let mut stmt = self.get_stmt(conn)?;
let columns = stmt.column_count();
let mut rows = stmt.raw_query();
rows.next()?
.map(|row| {
(0..columns)
.map(|i| row.get(i))
.collect::<Result<Vec<_>, _>>()
})
.transpose()
}
/// Runs the query and returns the first row or None
pub fn fetch_all(
self,
conn: &PooledResource<SqliteConnectionManager>,
) -> rusqlite::Result<Vec<Vec<Column>>> {
let mut stmt = self.get_stmt(conn)?;
let columns = stmt.column_count();
let mut rows = stmt.raw_query();
let mut results = vec![];
while let Some(row) = rows.next()? {
results.push(
(0..columns)
.map(|i| row.get(i))
.collect::<Result<Vec<_>, _>>()?,
);
}
Ok(results)
}
}

View File

@@ -7,8 +7,10 @@ use super::WalletSqliteDatabase;
/// Creates a new in-memory [`WalletSqliteDatabase`] instance
pub async fn empty() -> Result<WalletSqliteDatabase, Error> {
#[cfg(not(feature = "sqlcipher"))]
let db = WalletSqliteDatabase::new(":memory:").await?;
let path = ":memory:";
#[cfg(feature = "sqlcipher")]
let db = WalletSqliteDatabase::new(":memory:", "memory".to_owned()).await?;
Ok(db)
let path = (":memory:", "memory");
WalletSqliteDatabase::new(path).await
}

View File

@@ -1,21 +0,0 @@
// @generated
// Auto-generated by build.rs
pub static MIGRATIONS: &[(&str, &str)] = &[
("20240612132920_init.sql", include_str!(r#"./migrations/20240612132920_init.sql"#)),
("20240618200350_quote_state.sql", include_str!(r#"./migrations/20240618200350_quote_state.sql"#)),
("20240626091921_nut04_state.sql", include_str!(r#"./migrations/20240626091921_nut04_state.sql"#)),
("20240710144711_input_fee.sql", include_str!(r#"./migrations/20240710144711_input_fee.sql"#)),
("20240810214105_mint_icon_url.sql", include_str!(r#"./migrations/20240810214105_mint_icon_url.sql"#)),
("20240810233905_update_mint_url.sql", include_str!(r#"./migrations/20240810233905_update_mint_url.sql"#)),
("20240902151515_icon_url.sql", include_str!(r#"./migrations/20240902151515_icon_url.sql"#)),
("20240902210905_mint_time.sql", include_str!(r#"./migrations/20240902210905_mint_time.sql"#)),
("20241011125207_mint_urls.sql", include_str!(r#"./migrations/20241011125207_mint_urls.sql"#)),
("20241108092756_wallet_mint_quote_secretkey.sql", include_str!(r#"./migrations/20241108092756_wallet_mint_quote_secretkey.sql"#)),
("20250214135017_mint_tos.sql", include_str!(r#"./migrations/20250214135017_mint_tos.sql"#)),
("20250310111513_drop_nostr_last_checked.sql", include_str!(r#"./migrations/20250310111513_drop_nostr_last_checked.sql"#)),
("20250314082116_allow_pending_spent.sql", include_str!(r#"./migrations/20250314082116_allow_pending_spent.sql"#)),
("20250323152040_wallet_dleq_proofs.sql", include_str!(r#"./migrations/20250323152040_wallet_dleq_proofs.sql"#)),
("20250401120000_add_transactions_table.sql", include_str!(r#"./migrations/20250401120000_add_transactions_table.sql"#)),
("20250616144830_add_keyset_expiry.sql", include_str!(r#"./migrations/20250616144830_add_keyset_expiry.sql"#)),
("20250707093445_bolt12.sql", include_str!(r#"./migrations/20250707093445_bolt12.sql"#)),
];

File diff suppressed because it is too large Load Diff