mirror of
https://github.com/aljazceru/cdk.git
synced 2025-12-18 21:25:09 +01:00
Introduce run_db_operation_sync and run_db_operation
These functions are designed as a single funnel to talk to the database, whether it is synchronous or asynchronous. This single funnel will log SQL queries and slow operations, providing a clear and unified debug message for the problematic query, so it can be optimized accordingly (for instance, missing indexes or unbound SQL requests).
This commit is contained in:
@@ -1,13 +1,81 @@
|
|||||||
|
use std::fmt::Debug;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use cdk_common::database::Error;
|
||||||
|
|
||||||
use crate::database::DatabaseExecutor;
|
use crate::database::DatabaseExecutor;
|
||||||
use crate::stmt::query;
|
use crate::stmt::query;
|
||||||
|
|
||||||
|
const SLOW_QUERY_THRESHOLD_MS: u128 = 20;
|
||||||
|
|
||||||
|
/// Run a database operation and log slow operations, it also converts and logs any error with a
|
||||||
|
/// given info for more context. This function is expecting a synchronous database operation
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn run_db_operation_sync<F, E, E1, T>(
|
||||||
|
info: &str,
|
||||||
|
operation: F,
|
||||||
|
error_map: E,
|
||||||
|
) -> Result<T, Error>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> Result<T, E1>,
|
||||||
|
E1: Debug,
|
||||||
|
E: FnOnce(E1) -> Error,
|
||||||
|
{
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
tracing::trace!("Running db operation {}", info);
|
||||||
|
|
||||||
|
let result = operation().map_err(|e| {
|
||||||
|
tracing::error!("Query {} failed with error {:?}", info, e);
|
||||||
|
error_map(e)
|
||||||
|
});
|
||||||
|
|
||||||
|
let duration = start.elapsed();
|
||||||
|
if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
|
||||||
|
tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run a database operation and log slow operations, it also converts and logs any error with a
|
||||||
|
/// given info for more context
|
||||||
|
#[inline(always)]
|
||||||
|
pub async fn run_db_operation<Fut, E, E1, T>(
|
||||||
|
info: &str,
|
||||||
|
operation: Fut,
|
||||||
|
error_map: E,
|
||||||
|
) -> Result<T, Error>
|
||||||
|
where
|
||||||
|
Fut: Future<Output = Result<T, E1>>,
|
||||||
|
E1: Debug,
|
||||||
|
E: FnOnce(E1) -> Error,
|
||||||
|
{
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
tracing::trace!("Running db operation {}", info);
|
||||||
|
|
||||||
|
let result = operation.await.map_err(|e| {
|
||||||
|
tracing::error!("Query {} failed with error {:?}", info, e);
|
||||||
|
error_map(e)
|
||||||
|
});
|
||||||
|
|
||||||
|
let duration = start.elapsed();
|
||||||
|
if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
|
||||||
|
tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
/// Migrates the migration generated by `build.rs`
|
/// Migrates the migration generated by `build.rs`
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub async fn migrate<C>(
|
pub async fn migrate<C>(
|
||||||
conn: &C,
|
conn: &C,
|
||||||
db_prefix: &str,
|
db_prefix: &str,
|
||||||
migrations: &[(&str, &str, &str)],
|
migrations: &[(&str, &str, &str)],
|
||||||
) -> Result<(), cdk_common::database::Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
C: DatabaseExecutor,
|
C: DatabaseExecutor,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ pub mod stmt;
|
|||||||
pub mod value;
|
pub mod value;
|
||||||
|
|
||||||
pub use cdk_common::database::ConversionError;
|
pub use cdk_common::database::ConversionError;
|
||||||
|
pub use common::{run_db_operation, run_db_operation_sync};
|
||||||
|
|
||||||
#[cfg(feature = "mint")]
|
#[cfg(feature = "mint")]
|
||||||
pub mod mint;
|
pub mod mint;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
//! Simple SQLite
|
//! Simple SQLite
|
||||||
use cdk_common::database::Error;
|
use cdk_common::database::Error;
|
||||||
use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
|
use cdk_sql_common::database::{DatabaseConnector, DatabaseExecutor, DatabaseTransaction};
|
||||||
|
use cdk_sql_common::run_db_operation_sync;
|
||||||
use cdk_sql_common::stmt::{query, Column, SqlPart, Statement};
|
use cdk_sql_common::stmt::{query, Column, SqlPart, Statement};
|
||||||
use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode};
|
use rusqlite::{ffi, CachedStatement, Connection, Error as SqliteError, ErrorCode};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
@@ -25,7 +26,7 @@ impl AsyncSqlite {
|
|||||||
&self,
|
&self,
|
||||||
conn: &'a Connection,
|
conn: &'a Connection,
|
||||||
statement: Statement,
|
statement: Statement,
|
||||||
) -> Result<CachedStatement<'a>, Error> {
|
) -> Result<(String, CachedStatement<'a>), Error> {
|
||||||
let (sql, placeholder_values) = statement.to_sql()?;
|
let (sql, placeholder_values) = statement.to_sql()?;
|
||||||
|
|
||||||
let new_sql = sql.trim().trim_end_matches("FOR UPDATE");
|
let new_sql = sql.trim().trim_end_matches("FOR UPDATE");
|
||||||
@@ -39,7 +40,7 @@ impl AsyncSqlite {
|
|||||||
.map_err(|e| Error::Database(Box::new(e)))?;
|
.map_err(|e| Error::Database(Box::new(e)))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(stmt)
|
Ok((sql, stmt))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -104,68 +105,81 @@ impl DatabaseExecutor for AsyncSqlite {
|
|||||||
async fn execute(&self, statement: Statement) -> Result<usize, Error> {
|
async fn execute(&self, statement: Statement) -> Result<usize, Error> {
|
||||||
let conn = self.inner.lock().await;
|
let conn = self.inner.lock().await;
|
||||||
|
|
||||||
let mut stmt = self
|
let (sql, mut stmt) = self
|
||||||
.get_stmt(&conn, statement)
|
.get_stmt(&conn, statement)
|
||||||
.map_err(|e| Error::Database(Box::new(e)))?;
|
.map_err(|e| Error::Database(Box::new(e)))?;
|
||||||
|
|
||||||
Ok(stmt.raw_execute().map_err(to_sqlite_error)?)
|
run_db_operation_sync(&sql, || stmt.raw_execute(), to_sqlite_error)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
|
async fn fetch_one(&self, statement: Statement) -> Result<Option<Vec<Column>>, Error> {
|
||||||
let conn = self.inner.lock().await;
|
let conn = self.inner.lock().await;
|
||||||
let mut stmt = self
|
let (sql, mut stmt) = self
|
||||||
.get_stmt(&conn, statement)
|
.get_stmt(&conn, statement)
|
||||||
.map_err(|e| Error::Database(Box::new(e)))?;
|
.map_err(|e| Error::Database(Box::new(e)))?;
|
||||||
|
|
||||||
let columns = stmt.column_count();
|
run_db_operation_sync(
|
||||||
|
&sql,
|
||||||
|
|| {
|
||||||
|
let columns = stmt.column_count();
|
||||||
|
|
||||||
let mut rows = stmt.raw_query();
|
let mut rows = stmt.raw_query();
|
||||||
rows.next()
|
rows.next()?
|
||||||
.map_err(to_sqlite_error)?
|
.map(|row| {
|
||||||
.map(|row| {
|
(0..columns)
|
||||||
(0..columns)
|
.map(|i| row.get(i).map(from_sqlite))
|
||||||
.map(|i| row.get(i).map(from_sqlite))
|
.collect::<Result<Vec<_>, _>>()
|
||||||
.collect::<Result<Vec<_>, _>>()
|
})
|
||||||
})
|
.transpose()
|
||||||
.transpose()
|
},
|
||||||
.map_err(to_sqlite_error)
|
to_sqlite_error,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
|
async fn fetch_all(&self, statement: Statement) -> Result<Vec<Vec<Column>>, Error> {
|
||||||
let conn = self.inner.lock().await;
|
let conn = self.inner.lock().await;
|
||||||
let mut stmt = self
|
let (sql, mut stmt) = self
|
||||||
.get_stmt(&conn, statement)
|
.get_stmt(&conn, statement)
|
||||||
.map_err(|e| Error::Database(Box::new(e)))?;
|
.map_err(|e| Error::Database(Box::new(e)))?;
|
||||||
|
|
||||||
let columns = stmt.column_count();
|
let columns = stmt.column_count();
|
||||||
|
|
||||||
let mut rows = stmt.raw_query();
|
run_db_operation_sync(
|
||||||
let mut results = vec![];
|
&sql,
|
||||||
|
|| {
|
||||||
|
let mut rows = stmt.raw_query();
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
while let Some(row) = rows.next().map_err(to_sqlite_error)? {
|
while let Some(row) = rows.next()? {
|
||||||
results.push(
|
results.push(
|
||||||
(0..columns)
|
(0..columns)
|
||||||
.map(|i| row.get(i).map(from_sqlite))
|
.map(|i| row.get(i).map(from_sqlite))
|
||||||
.collect::<Result<Vec<_>, _>>()
|
.collect::<Result<Vec<_>, _>>()?,
|
||||||
.map_err(to_sqlite_error)?,
|
)
|
||||||
)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
|
},
|
||||||
|
to_sqlite_error,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
|
async fn pluck(&self, statement: Statement) -> Result<Option<Column>, Error> {
|
||||||
let conn = self.inner.lock().await;
|
let conn = self.inner.lock().await;
|
||||||
let mut stmt = self
|
let (sql, mut stmt) = self
|
||||||
.get_stmt(&conn, statement)
|
.get_stmt(&conn, statement)
|
||||||
.map_err(|e| Error::Database(Box::new(e)))?;
|
.map_err(|e| Error::Database(Box::new(e)))?;
|
||||||
|
|
||||||
let mut rows = stmt.raw_query();
|
run_db_operation_sync(
|
||||||
rows.next()
|
&sql,
|
||||||
.map_err(to_sqlite_error)?
|
|| {
|
||||||
.map(|row| row.get(0usize).map(from_sqlite))
|
let mut rows = stmt.raw_query();
|
||||||
.transpose()
|
rows.next()?
|
||||||
.map_err(to_sqlite_error)
|
.map(|row| row.get(0usize).map(from_sqlite))
|
||||||
|
.transpose()
|
||||||
|
},
|
||||||
|
to_sqlite_error,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn batch(&self, mut statement: Statement) -> Result<(), Error> {
|
async fn batch(&self, mut statement: Statement) -> Result<(), Error> {
|
||||||
@@ -187,11 +201,8 @@ impl DatabaseExecutor for AsyncSqlite {
|
|||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let conn = self.inner.lock().await;
|
||||||
|
|
||||||
self.inner
|
run_db_operation_sync(&sql, || conn.execute_batch(&sql), to_sqlite_error)
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.execute_batch(&sql)
|
|
||||||
.map_err(to_sqlite_error)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user