pub mod params; pub mod value; pub use value::Value; pub use params::params_from_iter; use crate::params::*; use std::fmt::Debug; use std::num::NonZero; use std::rc::Rc; use std::sync::{Arc, Mutex}; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("SQL conversion failure: `{0}`")] ToSqlConversionFailure(BoxError), #[error("Mutex lock error: {0}")] MutexError(String), #[error("SQL execution failure: `{0}`")] SqlExecutionFailure(String), } impl From for Error { fn from(err: limbo_core::LimboError) -> Self { Error::SqlExecutionFailure(err.to_string()) } } pub(crate) type BoxError = Box; pub type Result = std::result::Result; pub struct Builder { path: String, } impl Builder { pub fn new_local(path: &str) -> Self { Self { path: path.to_string(), } } #[allow(unused_variables, clippy::arc_with_non_send_sync)] pub async fn build(self) -> Result { match self.path.as_str() { ":memory:" => { let io: Arc = Arc::new(limbo_core::MemoryIO::new()); let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?; Ok(Database { inner: db }) } path => { let io: Arc = Arc::new(limbo_core::PlatformIO::new()?); let db = limbo_core::Database::open_file(io, path, false)?; Ok(Database { inner: db }) } } } } #[derive(Clone)] pub struct Database { inner: Arc, } unsafe impl Send for Database {} unsafe impl Sync for Database {} impl Debug for Database { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Database").finish() } } impl Database { pub fn connect(&self) -> Result { let conn = self.inner.connect()?; #[allow(clippy::arc_with_non_send_sync)] let connection = Connection { inner: Arc::new(Mutex::new(conn)), }; Ok(connection) } } pub struct Connection { inner: Arc>>, } impl Clone for Connection { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), } } } unsafe impl Send for Connection {} unsafe impl Sync for Connection {} impl Connection { pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result { let mut stmt = self.prepare(sql).await?; stmt.query(params).await } pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result { let mut stmt = self.prepare(sql).await?; stmt.execute(params).await } pub async fn prepare(&self, sql: &str) -> Result { let conn = self .inner .lock() .map_err(|e| Error::MutexError(e.to_string()))?; let stmt = conn.prepare(sql)?; #[allow(clippy::arc_with_non_send_sync)] let statement = Statement { inner: Arc::new(Mutex::new(stmt)), }; Ok(statement) } pub fn pragma_query(&self, pragma_name: &str, mut f: F) -> Result<()> where F: FnMut(&Row) -> limbo_core::Result<()>, { let conn = self .inner .lock() .map_err(|e| Error::MutexError(e.to_string()))?; let rows: Vec = conn .pragma_query(pragma_name) .map_err(|e| Error::SqlExecutionFailure(e.to_string()))? .iter() .map(|row| row.iter().collect::()) .collect(); rows.iter().try_for_each(|row| { f(row).map_err(|e| { Error::SqlExecutionFailure(format!("Error executing user defined function: {}", e)) }) })?; Ok(()) } } pub struct Statement { inner: Arc>, } impl Clone for Statement { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), } } } unsafe impl Send for Statement {} unsafe impl Sync for Statement {} impl Statement { pub async fn query(&mut self, params: impl IntoParams) -> Result { let params = params.into_params()?; match params { params::Params::None => (), params::Params::Positional(values) => { for (i, value) in values.into_iter().enumerate() { let mut stmt = self.inner.lock().unwrap(); stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into()); } } params::Params::Named(_items) => todo!(), } #[allow(clippy::arc_with_non_send_sync)] let rows = Rows { inner: Arc::clone(&self.inner), }; Ok(rows) } pub async fn execute(&mut self, params: impl IntoParams) -> Result { { // Reset the statement before executing self.inner.lock().unwrap().reset(); } let params = params.into_params()?; match params { params::Params::None => (), params::Params::Positional(values) => { for (i, value) in values.into_iter().enumerate() { let mut stmt = self.inner.lock().unwrap(); stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into()); } } params::Params::Named(_items) => todo!(), } loop { let mut stmt = self.inner.lock().unwrap(); match stmt.step() { Ok(limbo_core::StepResult::Row) => { // unexpected row during execution, error out. return Ok(2); } Ok(limbo_core::StepResult::Done) => { return Ok(0); } Ok(limbo_core::StepResult::IO) => { let _ = stmt.run_once(); //return Ok(1); } Ok(limbo_core::StepResult::Busy) => { return Ok(4); } Ok(limbo_core::StepResult::Interrupt) => { return Ok(3); } Err(err) => { return Err(err.into()); } } } } pub fn columns(&self) -> Vec { let stmt = self.inner.lock().unwrap(); let n = stmt.num_columns(); let mut cols = Vec::with_capacity(n); for i in 0..n { let name = stmt.get_column_name(i).into_owned(); cols.push(Column { name, decl_type: None, // TODO }); } cols } } pub struct Column { name: String, decl_type: Option, } impl Column { pub fn name(&self) -> &str { &self.name } pub fn decl_type(&self) -> Option<&str> { self.decl_type.as_deref() } } pub trait IntoValue { fn into_value(self) -> Result; } #[derive(Debug, Clone)] pub enum Params { None, Positional(Vec), Named(Vec<(String, Value)>), } pub struct Transaction {} pub struct Rows { inner: Arc>, } impl Clone for Rows { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), } } } unsafe impl Send for Rows {} unsafe impl Sync for Rows {} impl Rows { pub async fn next(&mut self) -> Result> { let mut stmt = self .inner .lock() .map_err(|e| Error::MutexError(e.to_string()))?; match stmt.step() { Ok(limbo_core::StepResult::Row) => { let row = stmt.row().unwrap(); Ok(Some(Row { values: row.get_values().map(|v| v.to_owned()).collect(), })) } _ => Ok(None), } } } #[derive(Debug)] pub struct Row { values: Vec, } unsafe impl Send for Row {} unsafe impl Sync for Row {} impl Row { pub fn get_value(&self, index: usize) -> Result { let value = &self.values[index]; match value { limbo_core::Value::Integer(i) => Ok(Value::Integer(*i)), limbo_core::Value::Null => Ok(Value::Null), limbo_core::Value::Float(f) => Ok(Value::Real(*f)), limbo_core::Value::Text(text) => Ok(Value::Text(text.to_string())), limbo_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())), } } pub fn column_count(&self) -> usize { self.values.len() } } impl<'a> FromIterator<&'a limbo_core::Value> for Row { fn from_iter>(iter: T) -> Self { let values = iter .into_iter() .map(|v| match v { limbo_core::Value::Integer(i) => limbo_core::Value::Integer(*i), limbo_core::Value::Null => limbo_core::Value::Null, limbo_core::Value::Float(f) => limbo_core::Value::Float(*f), limbo_core::Value::Text(s) => limbo_core::Value::Text(s.clone()), limbo_core::Value::Blob(b) => limbo_core::Value::Blob(b.clone()), }) .collect(); Row { values } } } #[cfg(test)] mod tests { use super::*; use tempfile::NamedTempFile; #[tokio::test] async fn test_database_persistence() -> Result<()> { let temp_file = NamedTempFile::new().unwrap(); let db_path = temp_file.path().to_str().unwrap(); // First, create the database, a table, and insert some data { let db = Builder::new_local(db_path).build().await?; let conn = db.connect()?; conn.execute( "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", (), ) .await?; conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ()) .await?; conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ()) .await?; } // db and conn are dropped here, simulating closing // Now, re-open the database and check if the data is still there let db = Builder::new_local(db_path).build().await?; let conn = db.connect()?; let mut rows = conn .query("SELECT name FROM test_persistence ORDER BY id;", ()) .await?; let row1 = rows.next().await?.expect("Expected first row"); assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string())); let row2 = rows.next().await?.expect("Expected second row"); assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string())); assert!(rows.next().await?.is_none(), "Expected no more rows"); Ok(()) } #[tokio::test] async fn test_database_persistence_many_frames() -> Result<()> { let temp_file = NamedTempFile::new().unwrap(); let db_path = temp_file.path().to_str().unwrap(); const NUM_INSERTS: usize = 100; const TARGET_STRING_LEN: usize = 1024; // 1KB let mut original_data = Vec::with_capacity(NUM_INSERTS); for i in 0..NUM_INSERTS { let prefix = format!("test_string_{:04}_", i); let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len()); let padding: String = "A".repeat(padding_len); original_data.push(format!("{}{}", prefix, padding)); } // First, create the database, a table, and insert many large strings { let db = Builder::new_local(db_path).build().await?; let conn = db.connect()?; conn.execute( "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);", (), ) .await?; for data_val in &original_data { conn.execute( "INSERT INTO test_large_persistence (data) VALUES (?);", params::Params::Positional(vec![Value::Text(data_val.clone())]), ) .await?; } } // db and conn are dropped here, simulating closing // Now, re-open the database and check if the data is still there let db = Builder::new_local(db_path).build().await?; let conn = db.connect()?; let mut rows = conn .query("SELECT data FROM test_large_persistence ORDER BY id;", ()) .await?; for i in 0..NUM_INSERTS { let row = rows .next() .await? .unwrap_or_else(|| panic!("Expected row {} but found None", i)); assert_eq!( row.get_value(0)?, Value::Text(original_data[i].clone()), "Mismatch in retrieved data for row {}", i ); } assert!( rows.next().await?.is_none(), "Expected no more rows after retrieving all inserted data" ); // Delete the WAL file only and try to re-open and query let wal_path = format!("{}-wal", db_path); std::fs::remove_file(&wal_path) .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {}", e)) .unwrap(); // Attempt to re-open the database after deleting WAL and assert that table is missing. let db_after_wal_delete = Builder::new_local(db_path).build().await?; let conn_after_wal_delete = db_after_wal_delete.connect()?; let query_result_after_wal_delete = conn_after_wal_delete .query("SELECT data FROM test_large_persistence ORDER BY id;", ()) .await; match query_result_after_wal_delete { Ok(_) => panic!("Query succeeded after WAL deletion and DB reopen, but was expected to fail because the table definition should have been in the WAL."), Err(Error::SqlExecutionFailure(msg)) => { assert!( msg.contains("test_large_persistence not found"), "Expected 'test_large_persistence not found' error, but got: {}", msg ); } Err(e) => panic!( "Expected SqlExecutionFailure for 'no such table', but got a different error: {:?}", e ), } Ok(()) } #[tokio::test] async fn test_database_persistence_write_one_frame_many_times() -> Result<()> { let temp_file = NamedTempFile::new().unwrap(); let db_path = temp_file.path().to_str().unwrap(); for i in 0..100 { { let db = Builder::new_local(db_path).build().await?; let conn = db.connect()?; conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?; conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ()) .await?; } { let db = Builder::new_local(db_path).build().await?; let conn = db.connect()?; let mut rows_iter = conn .query("SELECT count(*) FROM test_persistence;", ()) .await?; let rows = rows_iter.next().await?.unwrap(); assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1)); assert!(rows_iter.next().await?.is_none()); } } Ok(()) } }