From 937d9ac43b68bda67aa685bd4531bfb0df8fee33 Mon Sep 17 00:00:00 2001 From: Cesar Rodas Date: Mon, 11 Aug 2025 11:55:52 -0300 Subject: [PATCH] Introduce `run_db_operation_sync` and `run_db_operation` These functions are designed as a single funnel to talk to the database, whether it is synchronous or asynchronous. This single funnel will log SQL queries and slow operations, providing a clear and unified debug message for the problematic query, so it can be optimized accordingly (for instance, missing indexes or unbound SQL requests). --- crates/cdk-sql-common/src/common.rs | 70 ++++++++++++++++++++- crates/cdk-sql-common/src/lib.rs | 1 + crates/cdk-sqlite/src/async_sqlite.rs | 91 +++++++++++++++------------ 3 files changed, 121 insertions(+), 41 deletions(-) diff --git a/crates/cdk-sql-common/src/common.rs b/crates/cdk-sql-common/src/common.rs index f75bb63c..fb85e765 100644 --- a/crates/cdk-sql-common/src/common.rs +++ b/crates/cdk-sql-common/src/common.rs @@ -1,13 +1,81 @@ +use std::fmt::Debug; +use std::future::Future; +use std::time::Instant; + +use cdk_common::database::Error; + use crate::database::DatabaseExecutor; use crate::stmt::query; +const SLOW_QUERY_THRESHOLD_MS: u128 = 20; + +/// Run a database operation and log slow operations, it also converts and logs any error with a +/// given info for more context. This function is expecting a synchronous database operation +#[inline(always)] +pub fn run_db_operation_sync( + info: &str, + operation: F, + error_map: E, +) -> Result +where + F: FnOnce() -> Result, + E1: Debug, + E: FnOnce(E1) -> Error, +{ + let start = Instant::now(); + + tracing::trace!("Running db operation {}", info); + + let result = operation().map_err(|e| { + tracing::error!("Query {} failed with error {:?}", info, e); + error_map(e) + }); + + let duration = start.elapsed(); + if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS { + tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info); + } + + result +} + +/// Run a database operation and log slow operations, it also converts and logs any error with a +/// given info for more context +#[inline(always)] +pub async fn run_db_operation( + info: &str, + operation: Fut, + error_map: E, +) -> Result +where + Fut: Future>, + E1: Debug, + E: FnOnce(E1) -> Error, +{ + let start = Instant::now(); + + tracing::trace!("Running db operation {}", info); + + let result = operation.await.map_err(|e| { + tracing::error!("Query {} failed with error {:?}", info, e); + error_map(e) + }); + + let duration = start.elapsed(); + if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS { + tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info); + } + + result +} + /// Migrates the migration generated by `build.rs` #[inline(always)] pub async fn migrate( conn: &C, db_prefix: &str, migrations: &[(&str, &str, &str)], -) -> Result<(), cdk_common::database::Error> +) -> Result<(), Error> where C: DatabaseExecutor, { diff --git a/crates/cdk-sql-common/src/lib.rs b/crates/cdk-sql-common/src/lib.rs index 8c44b9a4..03020414 100644 --- a/crates/cdk-sql-common/src/lib.rs +++ b/crates/cdk-sql-common/src/lib.rs @@ -11,6 +11,7 @@ pub mod stmt; pub mod value; pub use cdk_common::database::ConversionError; +pub use common::{run_db_operation, run_db_operation_sync}; #[cfg(feature = "mint")] pub mod mint; diff --git a/crates/cdk-sqlite/src/async_sqlite.rs b/crates/cdk-sqlite/src/async_sqlite.rs index 2efe47e1..ce63ed1e 100644 --- a/crates/cdk-sqlite/src/async_sqlite.rs +++ b/crates/cdk-sqlite/src/async_sqlite.rs @@ -1,6 +1,7 @@ //! Simple SQLite use cdk_common::database::Error; use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction}; +use cdk_sql_common::run_db_operation_sync; use cdk_sql_common::stmt::{query, Column, SqlPart, Statement}; use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode}; use tokio::sync::Mutex; @@ -25,7 +26,7 @@ impl AsyncSqlite { &self, conn: &'a Connection, statement: Statement, - ) -> Result, Error> { + ) -> Result<(String, CachedStatement<'a>), Error> { let (sql, placeholder_values) = statement.to_sql()?; let new_sql = sql.trim().trim_end_matches("FOR UPDATE"); @@ -39,7 +40,7 @@ impl AsyncSqlite { .map_err(|e| Error::Database(Box::new(e)))?; } - Ok(stmt) + Ok((sql, stmt)) } } @@ -104,68 +105,81 @@ impl DatabaseExecutor for AsyncSqlite { async fn execute(&self, statement: Statement) -> Result { let conn = self.inner.lock().await; - let mut stmt = self + let (sql, mut stmt) = self .get_stmt(&conn, statement) .map_err(|e| Error::Database(Box::new(e)))?; - Ok(stmt.raw_execute().map_err(to_sqlite_error)?) + run_db_operation_sync(&sql, || stmt.raw_execute(), to_sqlite_error) } async fn fetch_one(&self, statement: Statement) -> Result>, Error> { let conn = self.inner.lock().await; - let mut stmt = self + let (sql, mut stmt) = self .get_stmt(&conn, statement) .map_err(|e| Error::Database(Box::new(e)))?; - let columns = stmt.column_count(); + run_db_operation_sync( + &sql, + || { + let columns = stmt.column_count(); - let mut rows = stmt.raw_query(); - rows.next() - .map_err(to_sqlite_error)? - .map(|row| { - (0..columns) - .map(|i| row.get(i).map(from_sqlite)) - .collect::, _>>() - }) - .transpose() - .map_err(to_sqlite_error) + let mut rows = stmt.raw_query(); + rows.next()? + .map(|row| { + (0..columns) + .map(|i| row.get(i).map(from_sqlite)) + .collect::, _>>() + }) + .transpose() + }, + to_sqlite_error, + ) } async fn fetch_all(&self, statement: Statement) -> Result>, Error> { let conn = self.inner.lock().await; - let mut stmt = self + let (sql, mut stmt) = self .get_stmt(&conn, statement) .map_err(|e| Error::Database(Box::new(e)))?; let columns = stmt.column_count(); - let mut rows = stmt.raw_query(); - let mut results = vec![]; + run_db_operation_sync( + &sql, + || { + let mut rows = stmt.raw_query(); + let mut results = vec![]; - while let Some(row) = rows.next().map_err(to_sqlite_error)? { - results.push( - (0..columns) - .map(|i| row.get(i).map(from_sqlite)) - .collect::, _>>() - .map_err(to_sqlite_error)?, - ) - } + while let Some(row) = rows.next()? { + results.push( + (0..columns) + .map(|i| row.get(i).map(from_sqlite)) + .collect::, _>>()?, + ) + } - Ok(results) + Ok(results) + }, + to_sqlite_error, + ) } async fn pluck(&self, statement: Statement) -> Result, Error> { let conn = self.inner.lock().await; - let mut stmt = self + let (sql, mut stmt) = self .get_stmt(&conn, statement) .map_err(|e| Error::Database(Box::new(e)))?; - let mut rows = stmt.raw_query(); - rows.next() - .map_err(to_sqlite_error)? - .map(|row| row.get(0usize).map(from_sqlite)) - .transpose() - .map_err(to_sqlite_error) + run_db_operation_sync( + &sql, + || { + let mut rows = stmt.raw_query(); + rows.next()? + .map(|row| row.get(0usize).map(from_sqlite)) + .transpose() + }, + to_sqlite_error, + ) } async fn batch(&self, mut statement: Statement) -> Result<(), Error> { @@ -187,11 +201,8 @@ impl DatabaseExecutor for AsyncSqlite { unreachable!() } }; + let conn = self.inner.lock().await; - self.inner - .lock() - .await - .execute_batch(&sql) - .map_err(to_sqlite_error) + run_db_operation_sync(&sql, || conn.execute_batch(&sql), to_sqlite_error) } }