Prometheus crate (#883)

* feat: introduce `cdk-prometheus` crate with Prometheus server and CDK-specific metrics support
This commit is contained in:
asmo
2025-09-09 14:26:03 +02:00
committed by GitHub
parent c94979a357
commit 75a3e6d2c7
39 changed files with 4504 additions and 361 deletions

View File

@@ -16,10 +16,11 @@ default = ["mint", "wallet", "auth"]
mint = ["cdk-common/mint"]
wallet = ["cdk-common/wallet"]
auth = ["cdk-common/auth"]
prometheus = ["cdk-prometheus"]
[dependencies]
async-trait.workspace = true
cdk-common = { workspace = true, features = ["test"] }
cdk-prometheus = { workspace = true, optional = true }
bitcoin.workspace = true
thiserror.workspace = true
tracing.workspace = true

View File

@@ -57,6 +57,8 @@ mod migrations;
#[cfg(feature = "auth")]
pub use auth::SQLMintAuthDatabase;
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
/// Mint SQL Database
#[derive(Debug, Clone)]
@@ -299,11 +301,27 @@ where
type Err = Error;
async fn commit(self: Box<Self>) -> Result<(), Error> {
self.inner.commit().await
let result = self.inner.commit().await;
#[cfg(feature = "prometheus")]
{
let success = result.is_ok();
METRICS.record_mint_operation("transaction_commit", success);
METRICS.record_mint_operation_histogram("transaction_commit", success, 1.0);
}
Ok(result?)
}
async fn rollback(self: Box<Self>) -> Result<(), Error> {
self.inner.rollback().await
let result = self.inner.rollback().await;
#[cfg(feature = "prometheus")]
{
let success = result.is_ok();
METRICS.record_mint_operation("transaction_rollback", success);
METRICS.record_mint_operation_histogram("transaction_rollback", success, 1.0);
}
Ok(result?)
}
}
@@ -443,12 +461,14 @@ where
async fn begin_transaction<'a>(
&'a self,
) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
Ok(Box::new(SQLTransaction {
let tx = SQLTransaction {
inner: ConnectionWithTransaction::new(
self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
)
.await?,
}))
};
Ok(Box::new(tx))
}
async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
@@ -1072,35 +1092,58 @@ where
type Err = Error;
async fn get_mint_quote(&self, quote_id: &QuoteId) -> Result<Option<MintQuote>, Self::Err> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("get_mint_quote");
#[cfg(feature = "prometheus")]
let start_time = std::time::Instant::now();
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let payments = get_mint_quote_payments(&*conn, quote_id).await?;
let issuance = get_mint_quote_issuance(&*conn, quote_id).await?;
let result = async {
let payments = get_mint_quote_payments(&*conn, quote_id).await?;
let issuance = get_mint_quote_issuance(&*conn, quote_id).await?;
Ok(query(
r#"
SELECT
id,
amount,
unit,
request,
expiry,
request_lookup_id,
pubkey,
created_time,
amount_paid,
amount_issued,
payment_method,
request_lookup_id_kind
FROM
mint_quote
WHERE id = :id"#,
)?
.bind("id", quote_id.to_string())
.fetch_one(&*conn)
.await?
.map(|row| sql_row_to_mint_quote(row, payments, issuance))
.transpose()?)
query(
r#"
SELECT
id,
amount,
unit,
request,
expiry,
request_lookup_id,
pubkey,
created_time,
amount_paid,
amount_issued,
payment_method,
request_lookup_id_kind
FROM
mint_quote
WHERE id = :id"#,
)?
.bind("id", quote_id.to_string())
.fetch_one(&*conn)
.await?
.map(|row| sql_row_to_mint_quote(row, payments, issuance))
.transpose()
}
.await;
#[cfg(feature = "prometheus")]
{
let success = result.is_ok();
METRICS.record_mint_operation("get_mint_quote", success);
METRICS.record_mint_operation_histogram(
"get_mint_quote",
success,
start_time.elapsed().as_secs_f64(),
);
METRICS.dec_in_flight_requests("get_mint_quote");
}
result
}
async fn get_mint_quote_by_request(
@@ -1228,35 +1271,59 @@ where
&self,
quote_id: &QuoteId,
) -> Result<Option<mint::MeltQuote>, Self::Err> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("get_melt_quote");
#[cfg(feature = "prometheus")]
let start_time = std::time::Instant::now();
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT
id,
unit,
amount,
request,
fee_reserve,
expiry,
state,
payment_preimage,
request_lookup_id,
created_time,
paid_time,
payment_method,
options,
request_lookup_id_kind
FROM
melt_quote
WHERE
id=:id
"#,
)?
.bind("id", quote_id.to_string())
.fetch_one(&*conn)
.await?
.map(sql_row_to_melt_quote)
.transpose()?)
let result = async {
query(
r#"
SELECT
id,
unit,
amount,
request,
fee_reserve,
expiry,
state,
payment_preimage,
request_lookup_id,
created_time,
paid_time,
payment_method,
options,
request_lookup_id_kind
FROM
melt_quote
WHERE
id=:id
"#,
)?
.bind("id", quote_id.to_string())
.fetch_one(&*conn)
.await?
.map(sql_row_to_melt_quote)
.transpose()
}
.await;
#[cfg(feature = "prometheus")]
{
let success = result.is_ok();
METRICS.record_mint_operation("get_melt_quote", success);
METRICS.record_mint_operation_histogram(
"get_melt_quote",
success,
start_time.elapsed().as_secs_f64(),
);
METRICS.dec_in_flight_requests("get_melt_quote");
}
result
}
async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
@@ -1826,20 +1893,64 @@ where
async fn begin_transaction<'a>(
&'a self,
) -> Result<Box<dyn database::MintTransaction<'a, Error> + Send + Sync + 'a>, Error> {
Ok(Box::new(SQLTransaction {
let tx = SQLTransaction {
inner: ConnectionWithTransaction::new(
self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
)
.await?,
}))
};
Ok(Box::new(tx))
}
async fn get_mint_info(&self) -> Result<MintInfo, Error> {
Ok(self.fetch_from_config("mint_info").await?)
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("get_mint_info");
#[cfg(feature = "prometheus")]
let start_time = std::time::Instant::now();
let result = self.fetch_from_config("mint_info").await;
#[cfg(feature = "prometheus")]
{
let success = result.is_ok();
METRICS.record_mint_operation("get_mint_info", success);
METRICS.record_mint_operation_histogram(
"get_mint_info",
success,
start_time.elapsed().as_secs_f64(),
);
METRICS.dec_in_flight_requests("get_mint_info");
}
Ok(result?)
}
async fn get_quote_ttl(&self) -> Result<QuoteTTL, Error> {
Ok(self.fetch_from_config("quote_ttl").await?)
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("get_quote_ttl");
#[cfg(feature = "prometheus")]
let start_time = std::time::Instant::now();
let result = self.fetch_from_config("quote_ttl").await;
#[cfg(feature = "prometheus")]
{
let success = result.is_ok();
METRICS.record_mint_operation("get_quote_ttl", success);
METRICS.record_mint_operation_histogram(
"get_quote_ttl",
success,
start_time.elapsed().as_secs_f64(),
);
METRICS.dec_in_flight_requests("get_quote_ttl");
}
Ok(result?)
}
}

View File

@@ -6,9 +6,10 @@ use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::time::Instant;
#[cfg(feature = "prometheus")]
use cdk_prometheus::metrics::METRICS;
use crate::database::DatabaseConnector;
@@ -86,6 +87,8 @@ where
{
resource: Option<(Arc<AtomicBool>, RM::Connection)>,
pool: Arc<Pool<RM>>,
#[cfg(feature = "prometheus")]
start_time: Instant,
}
impl<RM> Debug for PooledResource<RM>
@@ -105,7 +108,16 @@ where
if let Some(resource) = self.resource.take() {
let mut active_resource = self.pool.queue.lock().expect("active_resource");
active_resource.push(resource);
self.pool.in_use.fetch_sub(1, Ordering::AcqRel);
let _in_use = self.pool.in_use.fetch_sub(1, Ordering::AcqRel);
#[cfg(feature = "prometheus")]
{
METRICS.set_db_connections_active(_in_use as i64);
let duration = self.start_time.elapsed().as_secs_f64();
METRICS.record_db_operation(duration, "drop");
}
// Notify a waiting thread
self.pool.waiter.notify_one();
@@ -155,6 +167,18 @@ where
self.get_timeout(self.default_timeout)
}
/// Increments the in_use connection counter and updates the metric
fn increment_connection_counter(&self) -> usize {
let in_use = self.in_use.fetch_add(1, Ordering::AcqRel);
#[cfg(feature = "prometheus")]
{
METRICS.set_db_connections_active(in_use as i64);
}
in_use
}
/// Get a new resource or fail after timeout is reached.
///
/// This function will return a free resource or create a new one if there is still room for it;
@@ -171,18 +195,20 @@ where
if let Some((stale, resource)) = resources.pop() {
if !stale.load(Ordering::SeqCst) {
drop(resources);
self.in_use.fetch_add(1, Ordering::AcqRel);
self.increment_connection_counter();
return Ok(PooledResource {
resource: Some((stale, resource)),
pool: self.clone(),
#[cfg(feature = "prometheus")]
start_time: Instant::now(),
});
}
}
if self.in_use.load(Ordering::Relaxed) < self.max_size {
drop(resources);
self.in_use.fetch_add(1, Ordering::AcqRel);
self.increment_connection_counter();
let stale: Arc<AtomicBool> = Arc::new(false.into());
return Ok(PooledResource {
@@ -191,6 +217,8 @@ where
RM::new_resource(&self.config, stale, timeout)?,
)),
pool: self.clone(),
#[cfg(feature = "prometheus")]
start_time: Instant::now(),
});
}