diff --git a/core/lib.rs b/core/lib.rs index 736e6820c..d992d03a7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,6 +47,7 @@ pub use error::LimboError; use translate::select::prepare_select_plan; pub type Result = std::result::Result; +use crate::storage::wal::CheckpointResult; use crate::translate::optimizer::optimize_plan; pub use io::OpenFlags; pub use io::PlatformIO; @@ -61,7 +62,6 @@ pub use storage::pager::Page; pub use storage::pager::Pager; pub use storage::wal::CheckpointStatus; pub use storage::wal::Wal; -use crate::storage::wal::CheckpointResult; pub static DATABASE_VERSION: OnceLock = OnceLock::new(); @@ -395,9 +395,9 @@ impl Connection { Ok(()) } - pub fn checkpoint(&self) -> Result<(CheckpointResult)> { - self.pager.clear_page_cache(); - Ok(()) + pub fn checkpoint(&self) -> Result { + let checkpoint_result = self.pager.clear_page_cache(); + Ok(checkpoint_result) } #[cfg(not(target_family = "wasm"))] @@ -410,7 +410,7 @@ impl Connection { loop { // TODO: make this async? match self.pager.checkpoint()? { - CheckpointStatus::Done => { + CheckpointStatus::Done(_) => { return Ok(()); } CheckpointStatus::IO => { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 559e872ae..ad3acd956 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -2,7 +2,7 @@ use crate::result::LimboResult; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent}; -use crate::storage::wal::Wal; +use crate::storage::wal::{CheckpointResult, Wal}; use crate::{Buffer, Result}; use log::trace; use std::cell::{RefCell, UnsafeCell}; @@ -207,12 +207,14 @@ impl Pager { } pub fn end_tx(&self) -> Result { - match self.cacheflush()? { - CheckpointStatus::Done => {} - CheckpointStatus::IO => return Ok(CheckpointStatus::IO), - }; - self.wal.borrow().end_read_tx()?; - Ok(CheckpointStatus::Done) + let checkpoint_status = self.cacheflush()?; + match checkpoint_status { + CheckpointStatus::IO => Ok(checkpoint_status), + CheckpointStatus::Done(_) => { + self.wal.borrow().end_read_tx()?; + Ok(checkpoint_status) + } + } } /// Reads a page from the database. @@ -301,6 +303,7 @@ impl Pager { } pub fn cacheflush(&self) -> Result { + let mut checkpoint_result = CheckpointResult::new(); loop { let state = self.flush_info.borrow().state.clone(); match state { @@ -334,7 +337,7 @@ impl Pager { FlushState::SyncWal => { match self.wal.borrow_mut().sync() { Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), - Ok(CheckpointStatus::Done) => {} + Ok(CheckpointStatus::Done(res)) => checkpoint_result = res, Err(e) => return Err(e), } @@ -348,7 +351,8 @@ impl Pager { } FlushState::Checkpoint => { match self.checkpoint()? { - CheckpointStatus::Done => { + CheckpointStatus::Done(res) => { + checkpoint_result = res; self.flush_info.borrow_mut().state = FlushState::SyncDbFile; } CheckpointStatus::IO => return Ok(CheckpointStatus::IO), @@ -368,10 +372,11 @@ impl Pager { } } } - Ok(CheckpointStatus::Done) + Ok(CheckpointStatus::Done(checkpoint_result)) } pub fn checkpoint(&self) -> Result { + let mut checkpoint_result = CheckpointResult::new(); loop { let state = self.checkpoint_state.borrow().clone(); trace!("pager_checkpoint(state={:?})", state); @@ -384,7 +389,8 @@ impl Pager { CheckpointMode::Passive, )? { CheckpointStatus::IO => return Ok(CheckpointStatus::IO), - CheckpointStatus::Done => { + CheckpointStatus::Done(res) => { + checkpoint_result = res; self.checkpoint_state.replace(CheckpointState::SyncDbFile); } }; @@ -408,7 +414,7 @@ impl Pager { Ok(CheckpointStatus::IO) } else { self.checkpoint_state.replace(CheckpointState::Checkpoint); - Ok(CheckpointStatus::Done) + Ok(CheckpointStatus::Done(checkpoint_result)) }; } } @@ -416,7 +422,8 @@ impl Pager { } // WARN: used for testing purposes - pub fn clear_page_cache(&self) { + pub fn clear_page_cache(&self) -> CheckpointResult { + let checkpoint_result: CheckpointResult; loop { match self.wal.borrow_mut().checkpoint( self, @@ -426,7 +433,8 @@ impl Pager { Ok(CheckpointStatus::IO) => { let _ = self.io.run_once(); } - Ok(CheckpointStatus::Done) => { + Ok(CheckpointStatus::Done(res)) => { + checkpoint_result = res; break; } Err(err) => panic!("error while clearing cache {}", err), @@ -434,6 +442,7 @@ impl Pager { } // TODO: only clear cache of things that are really invalidated self.page_cache.write().unwrap().clear(); + checkpoint_result } /* diff --git a/core/storage/wal.rs b/core/storage/wal.rs index deda89685..c514eae84 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,9 +1,9 @@ +use log::{debug, trace}; use std::collections::HashMap; +use std::fmt::Formatter; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::RwLock; -use std::{cell::RefCell, rc::Rc, sync::Arc}; - -use log::{debug, trace}; +use std::{cell::RefCell, fmt, rc::Rc, sync::Arc}; use crate::io::{File, SyncCompletion, IO}; use crate::result::LimboResult; @@ -27,8 +27,19 @@ pub const WRITE_LOCK: u32 = 2; #[derive(Debug)] pub struct CheckpointResult { - /// number of pages moved successfully from WAL to db file after checkpoint - nbackfills: u64, + /// number of frames in WAL + pub num_wal_frames: u64, + /// number of frames moved successfully from WAL to db file after checkpoint + pub num_checkpointed_frames: u64, +} + +impl CheckpointResult { + pub fn new() -> Self { + Self { + num_wal_frames: 0, + num_checkpointed_frames: 0, + } + } } #[derive(Debug)] @@ -165,7 +176,7 @@ pub trait Wal { // Syncing requires a state machine because we need to schedule a sync and then wait until it is // finished. If we don't wait there will be undefined behaviour that no one wants to debug. -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] enum SyncState { NotSyncing, Syncing, @@ -182,7 +193,7 @@ pub enum CheckpointState { } pub enum CheckpointStatus { - Done, + Done(CheckpointResult), IO, } @@ -202,6 +213,17 @@ struct OngoingCheckpoint { current_page: u64, } +impl fmt::Debug for OngoingCheckpoint { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OngoingCheckpoint") + .field("state", &self.state) + .field("min_frame", &self.min_frame) + .field("max_frame", &self.max_frame) + .field("current_page", &self.current_page) + .finish() + } +} + #[allow(dead_code)] pub struct WalFile { io: Arc, @@ -224,6 +246,23 @@ pub struct WalFile { min_frame: u64, } +impl fmt::Debug for WalFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WalFile") + .field("sync_state", &self.sync_state) + .field("syncing", &self.syncing) + .field("page_size", &self.page_size) + .field("shared", &self.shared) + .field("ongoing_checkpoint", &self.ongoing_checkpoint) + .field("checkpoint_threshold", &self.checkpoint_threshold) + .field("max_frame_read_lock_index", &self.max_frame_read_lock_index) + .field("max_frame", &self.max_frame) + .field("min_frame", &self.min_frame) + // Excluding other fields + .finish() + } +} + // TODO(pere): lock only important parts + pin WalFileShared /// WalFileShared is the part of a WAL that will be shared between threads. A wal has information /// that needs to be communicated between threads so this struct does the job. @@ -254,6 +293,21 @@ pub struct WalFileShared { write_lock: LimboRwLock, } +impl fmt::Debug for WalFileShared { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WalFileShared") + .field("wal_header", &self.wal_header) + .field("min_frame", &self.min_frame) + .field("max_frame", &self.max_frame) + .field("nbackfills", &self.nbackfills) + .field("frame_cache", &self.frame_cache) + .field("pages_in_frames", &self.pages_in_frames) + .field("last_checksum", &self.last_checksum) + // Excluding `file`, `read_locks`, and `write_lock` + .finish() + } +} + impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&mut self) -> Result { @@ -534,6 +588,13 @@ impl Wal for WalFile { return Ok(CheckpointStatus::IO); } let mut shared = self.shared.write().unwrap(); + + // Record two num pages fields to return as checkpoint result to caller. + // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html + let checkpoint_result = CheckpointResult { + num_wal_frames: shared.max_frame, + num_checkpointed_frames: self.ongoing_checkpoint.max_frame, + }; let everything_backfilled = shared.max_frame == self.ongoing_checkpoint.max_frame; if everything_backfilled { @@ -547,7 +608,7 @@ impl Wal for WalFile { shared.nbackfills = self.ongoing_checkpoint.max_frame; } self.ongoing_checkpoint.state = CheckpointState::Start; - return Ok(CheckpointStatus::Done); + return Ok(CheckpointStatus::Done(checkpoint_result)); } } } @@ -578,7 +639,11 @@ impl Wal for WalFile { Ok(CheckpointStatus::IO) } else { self.sync_state.replace(SyncState::NotSyncing); - Ok(CheckpointStatus::Done) + let checkpoint_result = CheckpointResult { + num_wal_frames: self.max_frame, + num_checkpointed_frames: self.ongoing_checkpoint.max_frame, + }; + Ok(CheckpointStatus::Done(checkpoint_result)) } } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 16156347a..5082516f7 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -32,6 +32,7 @@ use crate::info; use crate::pseudo::PseudoCursor; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::DatabaseHeader; +use crate::storage::wal::CheckpointResult; use crate::storage::{btree::BTreeCursor, pager::Pager}; use crate::types::{ AggContext, Cursor, CursorResult, ExternalAggState, OwnedRecord, OwnedValue, Record, SeekKey, @@ -137,6 +138,7 @@ pub type PageIdx = usize; // Index of insn in list of insns type InsnReference = u32; +#[derive(Debug)] pub enum StepResult<'a> { Done, IO, @@ -468,15 +470,18 @@ impl Program { } => { let result = self.connection.upgrade().unwrap().checkpoint(); match result { - Ok(()) => { + Ok(CheckpointResult { + num_wal_frames: num_wal_pages, + num_checkpointed_frames: num_checkpointed_pages, + }) => { // https://sqlite.org/pragma.html#pragma_wal_checkpoint - // TODO make 2nd and 3rd cols available through checkpoint method // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). state.registers[*dest] = OwnedValue::Integer(0); // 2nd col: # modified pages written to wal file - state.registers[*dest + 1] = OwnedValue::Integer(0); + state.registers[*dest + 1] = OwnedValue::Integer(num_wal_pages as i64); // 3rd col: # pages moved to db after checkpoint - state.registers[*dest + 2] = OwnedValue::Integer(0); + state.registers[*dest + 2] = + OwnedValue::Integer(num_checkpointed_pages as i64); } Err(_err) => state.registers[*dest] = OwnedValue::Integer(1), } @@ -1015,7 +1020,7 @@ impl Program { return if self.auto_commit { match pager.end_tx() { Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO), - Ok(crate::storage::wal::CheckpointStatus::Done) => { + Ok(crate::storage::wal::CheckpointStatus::Done(_)) => { if self.change_cnt_on { if let Some(conn) = self.connection.upgrade() { conn.set_changes(self.n_change.get()); diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 9fbb9eb41..0e567d0fd 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -13,13 +13,17 @@ pub struct TempDatabase { #[allow(dead_code, clippy::arc_with_non_send_sync)] impl TempDatabase { pub fn new_empty() -> Self { - let mut path = TempDir::new().unwrap().into_path(); - path.push("test.db"); - let io: Arc = Arc::new(limbo_core::PlatformIO::new().unwrap()); + Self::new("test.db") + } + pub fn new(db_name: &str) -> Self { + let mut path = TempDir::new().unwrap().into_path(); + path.push(db_name); + let io: Arc = Arc::new(limbo_core::PlatformIO::new().unwrap()); Self { path, io } } - pub fn new(table_sql: &str) -> Self { + + pub fn new_with_rusqlite(table_sql: &str) -> Self { let mut path = TempDir::new().unwrap().into_path(); path.push("test.db"); { @@ -47,7 +51,7 @@ impl TempDatabase { pub(crate) fn do_flush(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { loop { match conn.cacheflush()? { - CheckpointStatus::Done => { + CheckpointStatus::Done(_) => { break; } CheckpointStatus::IO => { @@ -82,8 +86,9 @@ mod tests { #[test] fn test_statement_columns() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = - TempDatabase::new("create table test (foo integer, bar integer, baz integer);"); + let tmp_db = TempDatabase::new_with_rusqlite( + "create table test (foo integer, bar integer, baz integer);", + ); let conn = tmp_db.connect_limbo(); let stmt = conn.prepare("select * from test;")?; diff --git a/tests/integration/functions/test_function_rowid.rs b/tests/integration/functions/test_function_rowid.rs index 72045e2ab..03edf4c44 100644 --- a/tests/integration/functions/test_function_rowid.rs +++ b/tests/integration/functions/test_function_rowid.rs @@ -4,7 +4,9 @@ use limbo_core::{StepResult, Value}; #[test] fn test_last_insert_rowid_basic() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test_rowid (id INTEGER PRIMARY KEY, val TEXT);"); + let tmp_db = TempDatabase::new_with_rusqlite( + "CREATE TABLE test_rowid (id INTEGER PRIMARY KEY, val TEXT);", + ); let conn = tmp_db.connect_limbo(); // Simple insert @@ -85,7 +87,8 @@ fn test_last_insert_rowid_basic() -> anyhow::Result<()> { #[test] fn test_integer_primary_key() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test_rowid (id INTEGER PRIMARY KEY);"); + let tmp_db = + TempDatabase::new_with_rusqlite("CREATE TABLE test_rowid (id INTEGER PRIMARY KEY);"); let conn = tmp_db.connect_limbo(); for query in &[ diff --git a/tests/integration/mod.rs b/tests/integration/mod.rs index 01bb224dd..9d29ef35c 100644 --- a/tests/integration/mod.rs +++ b/tests/integration/mod.rs @@ -3,3 +3,4 @@ mod functions; mod fuzz; mod pragma; mod query_processing; +mod wal; diff --git a/tests/integration/query_processing/test_read_path.rs b/tests/integration/query_processing/test_read_path.rs index 55f72c1cc..3d84ed649 100644 --- a/tests/integration/query_processing/test_read_path.rs +++ b/tests/integration/query_processing/test_read_path.rs @@ -4,7 +4,7 @@ use limbo_core::{StepResult, Value}; #[test] fn test_statement_reset_bind() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("create table test (i integer);"); + let tmp_db = TempDatabase::new_with_rusqlite("create table test (i integer);"); let conn = tmp_db.connect_limbo(); let mut stmt = conn.prepare("select ?")?; @@ -41,7 +41,7 @@ fn test_statement_reset_bind() -> anyhow::Result<()> { #[test] fn test_statement_bind() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("create table test (i integer);"); + let tmp_db = TempDatabase::new_with_rusqlite("create table test (i integer);"); let conn = tmp_db.connect_limbo(); let mut stmt = conn.prepare("select ?, ?1, :named, ?3, ?4")?; diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 50d159a96..4508ecbad 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -7,7 +7,8 @@ use std::rc::Rc; #[test] fn test_simple_overflow_page() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); + let tmp_db = + TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); let conn = tmp_db.connect_limbo(); let mut huge_text = String::new(); @@ -75,7 +76,8 @@ fn test_simple_overflow_page() -> anyhow::Result<()> { #[test] fn test_sequential_overflow_page() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); + let tmp_db = + TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); let conn = tmp_db.connect_limbo(); let iterations = 10_usize; @@ -152,7 +154,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> { fn test_sequential_write() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);"); + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);"); let conn = tmp_db.connect_limbo(); let list_query = "SELECT * FROM test"; @@ -219,7 +221,7 @@ fn test_sequential_write() -> anyhow::Result<()> { /// https://github.com/tursodatabase/limbo/pull/679 fn test_regression_multi_row_insert() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test (x REAL);"); + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x REAL);"); let conn = tmp_db.connect_limbo(); let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)"; @@ -284,7 +286,7 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> { #[test] fn test_statement_reset() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("create table test (i integer);"); + let tmp_db = TempDatabase::new_with_rusqlite("create table test (i integer);"); let conn = tmp_db.connect_limbo(); conn.execute("insert into test values (1)")?; @@ -323,7 +325,7 @@ fn test_statement_reset() -> anyhow::Result<()> { #[ignore] fn test_wal_checkpoint() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);"); + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);"); // threshold is 1000 by default let iterations = 1001_usize; let conn = tmp_db.connect_limbo(); @@ -386,7 +388,7 @@ fn test_wal_checkpoint() -> anyhow::Result<()> { #[test] fn test_wal_restart() -> anyhow::Result<()> { let _ = env_logger::try_init(); - let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);"); + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);"); // threshold is 1000 by default fn insert(i: usize, conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { diff --git a/tests/integration/wal/mod.rs b/tests/integration/wal/mod.rs new file mode 100644 index 000000000..680831e9b --- /dev/null +++ b/tests/integration/wal/mod.rs @@ -0,0 +1 @@ +mod test_wal; diff --git a/tests/integration/wal/test_wal.rs b/tests/integration/wal/test_wal.rs new file mode 100644 index 000000000..fa320ae8f --- /dev/null +++ b/tests/integration/wal/test_wal.rs @@ -0,0 +1,90 @@ +use crate::common::{do_flush, TempDatabase}; +use limbo_core::{Connection, LimboError, Result, StepResult, Value}; +use std::cell::RefCell; +use std::rc::Rc; + +#[allow(clippy::arc_with_non_send_sync)] +#[test] +fn test_wal_checkpoint_result() -> Result<()> { + let tmp_db = TempDatabase::new("test_wal.db"); + let conn = tmp_db.connect_limbo(); + conn.execute("CREATE TABLE t1 (id text);")?; + + let res = execute_and_get_strings(&tmp_db, &conn, "pragma journal_mode;")?; + assert_eq!(res, vec!["wal"]); + + conn.execute("insert into t1(id) values (1), (2);")?; + do_flush(&conn, &tmp_db).unwrap(); + conn.execute("select * from t1;")?; + do_flush(&conn, &tmp_db).unwrap(); + + // checkpoint result should return > 0 num pages now as database has data + let res = execute_and_get_ints(&tmp_db, &conn, "pragma wal_checkpoint;")?; + println!("'pragma wal_checkpoint;' returns: {res:?}"); + assert_eq!(res.len(), 3); + assert_eq!(res[0], 0); // checkpoint successfully + assert!(res[1] > 0); // num pages in wal + assert!(res[2] > 0); // num pages checkpointed successfully + + Ok(()) +} + +/// Execute a statement and get strings result +pub(crate) fn execute_and_get_strings( + tmp_db: &TempDatabase, + conn: &Rc, + sql: &str, +) -> Result> { + let statement = conn.prepare(sql)?; + let stmt = Rc::new(RefCell::new(statement)); + let mut result = Vec::new(); + + while let Ok(step_result) = stmt.borrow_mut().step() { + match step_result { + StepResult::Row(row) => { + for el in &row.values { + result.push(format!("{el}")); + } + } + StepResult::Done => break, + StepResult::Interrupt => break, + StepResult::IO => tmp_db.io.run_once()?, + StepResult::Busy => tmp_db.io.run_once()?, + } + } + Ok(result) +} + +/// Execute a statement and get integers +pub(crate) fn execute_and_get_ints( + tmp_db: &TempDatabase, + conn: &Rc, + sql: &str, +) -> Result> { + let statement = conn.prepare(sql)?; + let stmt = Rc::new(RefCell::new(statement)); + let mut result = Vec::new(); + + while let Ok(step_result) = stmt.borrow_mut().step() { + match step_result { + StepResult::Row(row) => { + for value in &row.values { + let out = match value { + Value::Integer(i) => *i, + _ => { + return Err(LimboError::ConversionError(format!( + "cannot convert {value} to int" + ))) + } + }; + result.push(out); + } + } + StepResult::Done => break, + StepResult::Interrupt => break, + StepResult::IO => tmp_db.io.run_once()?, + StepResult::Busy => tmp_db.io.run_once()?, + } + } + Ok(result) +}