Add PostgreSQL support for mint and wallet (#878)

* Add PostgreSQL support for mint and wallet

* Fixed bug to avoid empty calls `get_proofs_states`

* Fixed SQL bug

* Avoid redudant clone()

* Add more tests for the storage layer

* Minor enhacements

* Add a generic function to execute db operations

This function would log slow operations and log errors

* Provision a postgres db for tests

* Update deps for msrv

* Add postgres to pipeline

* feat: add psgl to example and docker

* feat: db url fmt

---------

Co-authored-by: thesimplekid <tsk@thesimplekid.com>
This commit is contained in:
C
2025-08-18 13:45:11 -03:00
committed by GitHub
parent 2e424e629f
commit 28a01398fd
34 changed files with 1282 additions and 40 deletions

View File

@@ -0,0 +1,35 @@
[package]
name = "cdk-postgres"
version.workspace = true
edition.workspace = true
authors = ["CDK Developers"]
description = "PostgreSQL storage backend for CDK"
license.workspace = true
homepage = "https://github.com/cashubtc/cdk"
repository = "https://github.com/cashubtc/cdk.git"
rust-version.workspace = true # MSRV
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["mint", "wallet", "auth"]
mint = ["cdk-common/mint", "cdk-sql-common/mint"]
wallet = ["cdk-common/wallet", "cdk-sql-common/wallet"]
auth = ["cdk-common/auth", "cdk-sql-common/auth"]
[dependencies]
async-trait.workspace = true
cdk-common = { workspace = true, features = ["test"] }
bitcoin.workspace = true
cdk-sql-common = { workspace = true }
thiserror.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tracing.workspace = true
serde.workspace = true
serde_json.workspace = true
lightning-invoice.workspace = true
uuid.workspace = true
tokio-postgres = "0.7.13"
futures-util = "0.3.31"
postgres-native-tls = "0.5.1"
once_cell.workspace = true

View File

@@ -0,0 +1,155 @@
use cdk_common::database::Error;
use cdk_sql_common::run_db_operation;
use cdk_sql_common::stmt::{Column, Statement};
use futures_util::{pin_mut, TryStreamExt};
use tokio_postgres::error::SqlState;
use tokio_postgres::{Client, Error as PgError};
use crate::value::PgValue;
#[inline(always)]
fn to_pgsql_error(err: PgError) -> Error {
if let Some(err) = err.as_db_error() {
let code = err.code().to_owned();
if code == SqlState::INTEGRITY_CONSTRAINT_VIOLATION || code == SqlState::UNIQUE_VIOLATION {
return Error::Duplicate;
}
}
Error::Database(Box::new(err))
}
#[inline(always)]
pub async fn pg_batch(conn: &Client, statement: Statement) -> Result<(), Error> {
let (sql, _placeholder_values) = statement.to_sql()?;
run_db_operation(&sql, conn.batch_execute(&sql), to_pgsql_error).await
}
#[inline(always)]
pub async fn pg_execute(conn: &Client, statement: Statement) -> Result<usize, Error> {
let (sql, placeholder_values) = statement.to_sql()?;
let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
run_db_operation(
&sql,
async {
conn.execute_raw(
&prepared_statement,
placeholder_values
.iter()
.map(|x| x.into())
.collect::<Vec<PgValue>>(),
)
.await
.map(|x| x as usize)
},
to_pgsql_error,
)
.await
}
#[inline(always)]
pub async fn pg_fetch_one(
conn: &Client,
statement: Statement,
) -> Result<Option<Vec<Column>>, Error> {
let (sql, placeholder_values) = statement.to_sql()?;
let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
run_db_operation(
&sql,
async {
let stream = conn
.query_raw(
&prepared_statement,
placeholder_values
.iter()
.map(|x| x.into())
.collect::<Vec<PgValue>>(),
)
.await?;
pin_mut!(stream);
stream
.try_next()
.await?
.map(|row| {
(0..row.len())
.map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
.collect::<Result<Vec<_>, _>>()
})
.transpose()
},
to_pgsql_error,
)
.await
}
#[inline(always)]
pub async fn pg_fetch_all(conn: &Client, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
let (sql, placeholder_values) = statement.to_sql()?;
let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
run_db_operation(
&sql,
async {
let stream = conn
.query_raw(
&prepared_statement,
placeholder_values
.iter()
.map(|x| x.into())
.collect::<Vec<PgValue>>(),
)
.await?;
pin_mut!(stream);
let mut rows = vec![];
while let Some(row) = stream.try_next().await? {
rows.push(
(0..row.len())
.map(|i| row.try_get::<_, PgValue>(i).map(|value| value.into()))
.collect::<Result<Vec<_>, _>>()?,
);
}
Ok(rows)
},
to_pgsql_error,
)
.await
}
#[inline(always)]
pub async fn pg_pluck(conn: &Client, statement: Statement) -> Result<Option<Column>, Error> {
let (sql, placeholder_values) = statement.to_sql()?;
let prepared_statement = conn.prepare(&sql).await.map_err(to_pgsql_error)?;
run_db_operation(
&sql,
async {
let stream = conn
.query_raw(
&prepared_statement,
placeholder_values
.iter()
.map(|x| x.into())
.collect::<Vec<PgValue>>(),
)
.await?;
pin_mut!(stream);
stream
.try_next()
.await?
.map(|row| row.try_get::<_, PgValue>(0).map(|value| value.into()))
.transpose()
},
to_pgsql_error,
)
.await
}

View File

@@ -0,0 +1,257 @@
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use cdk_common::database::Error;
use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, GenericTransactionHandler};
use cdk_sql_common::mint::SQLMintAuthDatabase;
use cdk_sql_common::pool::{DatabaseConfig, DatabasePool};
use cdk_sql_common::stmt::{Column, Statement};
use cdk_sql_common::{SQLMintDatabase, SQLWalletDatabase};
use db::{pg_batch, pg_execute, pg_fetch_all, pg_fetch_one, pg_pluck};
use tokio::sync::{Mutex, Notify};
use tokio::time::timeout;
use tokio_postgres::{connect, Client, Error as PgError, NoTls};
mod db;
mod value;
#[derive(Debug)]
pub struct PgConnectionPool;
#[derive(Clone)]
pub enum SslMode {
NoTls(NoTls),
NativeTls(postgres_native_tls::MakeTlsConnector),
}
impl Default for SslMode {
fn default() -> Self {
SslMode::NoTls(NoTls {})
}
}
impl Debug for SslMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let debug_text = match self {
Self::NoTls(_) => "NoTls",
Self::NativeTls(_) => "NativeTls",
};
write!(f, "SslMode::{debug_text}")
}
}
/// Postgres configuration
#[derive(Clone, Debug)]
pub struct PgConfig {
url: String,
tls: SslMode,
}
impl DatabaseConfig for PgConfig {
fn default_timeout(&self) -> Duration {
Duration::from_secs(10)
}
fn max_size(&self) -> usize {
20
}
}
impl From<&str> for PgConfig {
fn from(value: &str) -> Self {
PgConfig {
url: value.to_owned(),
tls: Default::default(),
}
}
}
impl DatabasePool for PgConnectionPool {
type Config = PgConfig;
type Connection = PostgresConnection;
type Error = PgError;
fn new_resource(
config: &Self::Config,
still_valid: Arc<AtomicBool>,
timeout: Duration,
) -> Result<Self::Connection, cdk_sql_common::pool::Error<Self::Error>> {
Ok(PostgresConnection::new(
config.to_owned(),
timeout,
still_valid,
))
}
}
/// A postgres connection
#[derive(Debug)]
pub struct PostgresConnection {
timeout: Duration,
error: Arc<Mutex<Option<cdk_common::database::Error>>>,
result: Arc<OnceLock<Client>>,
notify: Arc<Notify>,
}
impl PostgresConnection {
/// Creates a new instance
pub fn new(config: PgConfig, timeout: Duration, still_valid: Arc<AtomicBool>) -> Self {
let failed = Arc::new(Mutex::new(None));
let result = Arc::new(OnceLock::new());
let notify = Arc::new(Notify::new());
let error_clone = failed.clone();
let result_clone = result.clone();
let notify_clone = notify.clone();
tokio::spawn(async move {
match config.tls {
SslMode::NoTls(tls) => {
let (client, connection) = match connect(&config.url, tls).await {
Ok((client, connection)) => (client, connection),
Err(err) => {
*error_clone.lock().await =
Some(cdk_common::database::Error::Database(Box::new(err)));
still_valid.store(false, std::sync::atomic::Ordering::Release);
notify_clone.notify_waiters();
return;
}
};
tokio::spawn(async move {
let _ = connection.await;
still_valid.store(false, std::sync::atomic::Ordering::Release);
});
let _ = result_clone.set(client);
notify_clone.notify_waiters();
}
SslMode::NativeTls(tls) => {
let (client, connection) = match connect(&config.url, tls).await {
Ok((client, connection)) => (client, connection),
Err(err) => {
*error_clone.lock().await =
Some(cdk_common::database::Error::Database(Box::new(err)));
still_valid.store(false, std::sync::atomic::Ordering::Release);
notify_clone.notify_waiters();
return;
}
};
tokio::spawn(async move {
let _ = connection.await;
still_valid.store(false, std::sync::atomic::Ordering::Release);
});
let _ = result_clone.set(client);
notify_clone.notify_waiters();
}
}
});
Self {
error: failed,
timeout,
result,
notify,
}
}
/// Gets the wrapped instance or the connection error. The connection is returned as reference,
/// and the actual error is returned once, next times a generic error would be returned
async fn inner(&self) -> Result<&Client, cdk_common::database::Error> {
if let Some(client) = self.result.get() {
return Ok(client);
}
if let Some(error) = self.error.lock().await.take() {
return Err(error);
}
if timeout(self.timeout, self.notify.notified()).await.is_err() {
return Err(cdk_common::database::Error::Internal("Timeout".to_owned()));
}
// Check result again
if let Some(client) = self.result.get() {
Ok(client)
} else if let Some(error) = self.error.lock().await.take() {
Err(error)
} else {
Err(cdk_common::database::Error::Internal(
"Failed connection".to_owned(),
))
}
}
}
#[async_trait::async_trait]
impl DatabaseConnector for PostgresConnection {
type Transaction = GenericTransactionHandler<Self>;
}
#[async_trait::async_trait]
impl DatabaseExecutor for PostgresConnection {
fn name() -> &'static str {
"postgres"
}
async fn execute(&self, statement: Statement) -> Result<usize, Error> {
pg_execute(self.inner().await?, statement).await
}
async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
pg_fetch_one(self.inner().await?, statement).await
}
async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
pg_fetch_all(self.inner().await?, statement).await
}
async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
pg_pluck(self.inner().await?, statement).await
}
async fn batch(&self, statement: Statement) -> Result<(), Error> {
pg_batch(self.inner().await?, statement).await
}
}
/// Mint DB implementation with PostgreSQL
pub type MintPgDatabase = SQLMintDatabase<PgConnectionPool>;
/// Mint Auth database with Postgres
#[cfg(feature = "auth")]
pub type MintPgAuthDatabase = SQLMintAuthDatabase<PgConnectionPool>;
/// Mint DB implementation with PostgresSQL
pub type WalletPgDatabase = SQLWalletDatabase<PgConnectionPool>;
#[cfg(test)]
mod test {
use cdk_common::mint_db_test;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use super::*;
static MIGRATION_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
async fn provide_db() -> MintPgDatabase {
let m = MIGRATION_LOCK.lock().await;
let db_url = std::env::var("CDK_MINTD_DATABASE_URL")
.or_else(|_| std::env::var("PG_DB_URL")) // Fallback for compatibility
.unwrap_or("host=localhost user=test password=test dbname=testdb port=5433".to_owned());
let db = MintPgDatabase::new(db_url.as_str())
.await
.expect("database");
drop(m);
db
}
mint_db_test!(provide_db);
}

View File

@@ -0,0 +1,130 @@
use std::fmt::Debug;
use cdk_sql_common::value::Value;
use tokio_postgres::types::{self, FromSql, ToSql};
#[derive(Debug)]
pub enum PgValue<'a> {
Null,
Integer(i64),
Real(f64),
Text(&'a str),
Blob(&'a [u8]),
}
impl<'a> From<&'a Value> for PgValue<'a> {
fn from(value: &'a Value) -> Self {
match value {
Value::Blob(b) => PgValue::Blob(b),
Value::Text(text) => PgValue::Text(text.as_str()),
Value::Null => PgValue::Null,
Value::Integer(i) => PgValue::Integer(*i),
Value::Real(r) => PgValue::Real(*r),
}
}
}
impl<'a> From<PgValue<'a>> for Value {
fn from(val: PgValue<'a>) -> Self {
match val {
PgValue::Blob(value) => Value::Blob(value.to_owned()),
PgValue::Text(value) => Value::Text(value.to_owned()),
PgValue::Null => Value::Null,
PgValue::Integer(n) => Value::Integer(n),
PgValue::Real(r) => Value::Real(r),
}
}
}
impl<'a> FromSql<'a> for PgValue<'a> {
fn accepts(_ty: &types::Type) -> bool {
true
}
fn from_sql(
ty: &types::Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
Ok(match *ty {
types::Type::VARCHAR | types::Type::TEXT | types::Type::BPCHAR | types::Type::NAME => {
PgValue::Text(<&str as FromSql>::from_sql(ty, raw)?)
}
types::Type::BOOL => PgValue::Integer(if <bool as FromSql>::from_sql(ty, raw)? {
1
} else {
0
}),
types::Type::INT2 => PgValue::Integer(<i8 as FromSql>::from_sql(ty, raw)? as i64),
types::Type::INT4 => PgValue::Integer(<i32 as FromSql>::from_sql(ty, raw)? as i64),
types::Type::INT8 => PgValue::Integer(<i64 as FromSql>::from_sql(ty, raw)?),
types::Type::BIT_ARRAY | types::Type::BYTEA | types::Type::UNKNOWN => {
PgValue::Blob(<&[u8] as FromSql>::from_sql(ty, raw)?)
}
_ => panic!("Unsupported type {ty:?}"),
})
}
fn from_sql_null(_ty: &types::Type) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
Ok(PgValue::Null)
}
}
impl ToSql for PgValue<'_> {
fn to_sql(
&self,
ty: &types::Type,
out: &mut types::private::BytesMut,
) -> Result<types::IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
match self {
PgValue::Blob(blob) => (*blob).to_sql(ty, out),
PgValue::Text(text) => (*text).to_sql(ty, out),
PgValue::Null => Ok(types::IsNull::Yes),
PgValue::Real(r) => r.to_sql(ty, out),
PgValue::Integer(i) => match *ty {
types::Type::BOOL => (*i != 0).to_sql(ty, out),
types::Type::INT2 => (*i as i16).to_sql(ty, out),
types::Type::INT4 => (*i as i32).to_sql(ty, out),
_ => i.to_sql_checked(ty, out),
},
}
}
fn accepts(_ty: &types::Type) -> bool
where
Self: Sized,
{
true
}
fn encode_format(&self, ty: &types::Type) -> types::Format {
match self {
PgValue::Blob(blob) => blob.encode_format(ty),
PgValue::Text(text) => text.encode_format(ty),
PgValue::Null => types::Format::Text,
PgValue::Real(r) => r.encode_format(ty),
PgValue::Integer(i) => i.encode_format(ty),
}
}
fn to_sql_checked(
&self,
ty: &types::Type,
out: &mut types::private::BytesMut,
) -> Result<types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
match self {
PgValue::Blob(blob) => blob.to_sql_checked(ty, out),
PgValue::Text(text) => text.to_sql_checked(ty, out),
PgValue::Null => Ok(types::IsNull::Yes),
PgValue::Real(r) => r.to_sql_checked(ty, out),
PgValue::Integer(i) => match *ty {
types::Type::BOOL => (*i != 0).to_sql_checked(ty, out),
types::Type::INT2 => (*i as i16).to_sql_checked(ty, out),
types::Type::INT4 => (*i as i32).to_sql_checked(ty, out),
_ => i.to_sql_checked(ty, out),
},
}
}
}

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
set -euo pipefail
CONTAINER_NAME="rust-test-pg"
DB_USER="test"
DB_PASS="test"
DB_NAME="testdb"
DB_PORT="5433"
echo "Starting fresh PostgreSQL container..."
docker run -d --rm \
--name "${CONTAINER_NAME}" \
-e POSTGRES_USER="${DB_USER}" \
-e POSTGRES_PASSWORD="${DB_PASS}" \
-e POSTGRES_DB="${DB_NAME}" \
-p ${DB_PORT}:5432 \
postgres:16
echo "Waiting for PostgreSQL to be ready and database '${DB_NAME}' to exist..."
until docker exec -e PGPASSWORD="${DB_PASS}" "${CONTAINER_NAME}" \
psql -U "${DB_USER}" -d "${DB_NAME}" -c "SELECT 1;" >/dev/null 2>&1; do
sleep 0.5
done
docker exec -e PGPASSWORD="${DB_PASS}" "${CONTAINER_NAME}" \
psql -U "${DB_USER}" -d "${DB_NAME}" -c "CREATE DATABASE mintdb;"
docker exec -e PGPASSWORD="${DB_PASS}" "${CONTAINER_NAME}" \
psql -U "${DB_USER}" -d "${DB_NAME}" -c "CREATE DATABASE mintdb_auth;"
export DATABASE_URL="host=localhost user=${DB_USER} password=${DB_PASS} dbname=${DB_NAME} port=${DB_PORT}"