diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 30dc3b4ca..e9ad03cdb 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -482,7 +482,7 @@ pub struct WalFileShared { /// read_locks is a list of read locks that can coexist with the max_frame number stored in /// value. There is a limited amount because and unbounded amount of connections could be /// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max - /// frames that is equal to 5. + /// frames that is equal to 5 pub read_locks: [LimboRwLock; 5], /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only /// one used. @@ -523,7 +523,7 @@ impl CheckpointGuard { return Err(LimboError::Busy); } match mode { - CheckpointMode::Passive | CheckpointMode::Full => { + CheckpointMode::Passive => { let read0 = &mut shared.read_locks[0]; if !read0.write() { shared.checkpoint_lock.unlock(); @@ -542,6 +542,7 @@ impl CheckpointGuard { } Ok(Self::Writer { ptr }) } + _ => todo!("not implemented yet"), } } } @@ -912,6 +913,11 @@ impl Wal for WalFile { write_counter: Rc>, mode: CheckpointMode, ) -> Result> { + if matches!(mode, CheckpointMode::Full) { + return Err(LimboError::InternalError( + "Full checkpoint mode is not implemented yet".into(), + )); + } 'checkpoint_loop: loop { let state = self.ongoing_checkpoint.state; tracing::debug!(?state); @@ -930,9 +936,7 @@ impl Wal for WalFile { ) }; let needs_backfill = shared_max > nbackfills; - if !needs_backfill - && matches!(mode, CheckpointMode::Passive | CheckpointMode::Full) - { + if !needs_backfill && matches!(mode, CheckpointMode::Passive) { // there are no frames to copy and we don't need to reset the log so we can // return early success. self.prev_checkpoint.release_guard(); // just in case we are still holding @@ -1029,9 +1033,9 @@ impl Wal for WalFile { self.ongoing_checkpoint.state = CheckpointState::Done; } } - // All eligible frames copied to the db file. - // Update nBackfills. - // In Reset or Truncate mode, we need to restart and possibly truncate the log. + // All eligible frames copied to the db file + // Update nBackfills + // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { if *write_counter.borrow() > 0 { @@ -1369,13 +1373,11 @@ impl WalFile { fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> { let needs_new_guard = !matches!( (&self.checkpoint_guard, mode), - ( - Some(CheckpointGuard::Read0 { .. }), - CheckpointMode::Passive | CheckpointMode::Full, - ) | ( - Some(CheckpointGuard::Writer { .. }), - CheckpointMode::Restart | CheckpointMode::Truncate, - ), + (Some(CheckpointGuard::Read0 { .. }), CheckpointMode::Passive,) + | ( + Some(CheckpointGuard::Writer { .. }), + CheckpointMode::Restart | CheckpointMode::Truncate, + ), ); if needs_new_guard { // Drop any existing guard @@ -1564,7 +1566,6 @@ pub mod test { }; use std::{ cell::{Cell, RefCell, UnsafeCell}, - os::unix::fs::MetadataExt, rc::Rc, sync::{atomic::Ordering, Arc}, }; @@ -1626,10 +1627,7 @@ pub mod test { let stat = std::fs::metadata(&walpath).unwrap(); let meta_before = std::fs::metadata(&walpath).unwrap(); - #[cfg(not(unix))] let bytes_before = meta_before.len(); - #[cfg(unix)] - let blocks_before = meta_before.blocks(); run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Truncate); drop(wal); @@ -1637,32 +1635,15 @@ pub mod test { tracing::info!("wal filepath: {walpath:?}, size: {}", stat.len()); let meta_after = std::fs::metadata(&walpath).unwrap(); - - #[cfg(unix)] - { - let blocks_after = meta_after.blocks(); - assert_ne!( - blocks_before, blocks_after, - "WAL file should not have been empty before checkpoint" - ); - assert_eq!( - blocks_after, 0, - "WAL file should be truncated to 0 bytes, but is {blocks_after} blocks", - ); - } - #[cfg(not(unix))] - { - let bytes_after = meta_after.len(); - assert_ne!( - bytes_before, bytes_after, - "WAL file should not have been empty before checkpoint" - ); - // On Windows, we check the size in bytes - assert_eq!( - bytes_after, 0, - "WAL file should be truncated to 0 bytes, but is {bytes_after} bytes", - ); - } + let bytes_after = meta_after.len(); + assert_ne!( + bytes_before, bytes_after, + "WAL file should not have been empty before checkpoint" + ); + assert_eq!( + bytes_after, 32, + "WAL file should be truncated to 0 bytes, but is {bytes_after} bytes", + ); std::fs::remove_dir_all(path).unwrap(); } @@ -1693,105 +1674,6 @@ pub mod test { } } - #[test] - fn test_wal_full_checkpoint_mode() { - let (db, path) = get_database(); - let conn = db.connect().unwrap(); - conn.execute("create table test(id integer primary key, value text)") - .unwrap(); - let mut walpath = path.clone().into_os_string().into_string().unwrap(); - walpath.push_str("/test.db-wal"); - let walpath = std::path::PathBuf::from(walpath); - - // Produce multiple WAL frames. - bulk_inserts(&conn, 25, 2); - conn.pager.borrow_mut().cacheflush().unwrap(); - - let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); - let (before_max, before_backfills) = unsafe { - let s = &*wal_shared.get(); - ( - s.max_frame.load(Ordering::SeqCst), - s.nbackfills.load(Ordering::SeqCst), - ) - }; - assert!(before_max > 0); - assert_eq!(before_backfills, 0); - - let meta_before = std::fs::metadata(&walpath).unwrap(); - #[cfg(not(unix))] - let bytes_before = meta_before.len(); - #[cfg(unix)] - let blocks_before = meta_before.blocks(); - - // Run FULL checkpoint. - { - let pager_ref = conn.pager.borrow(); - let mut wal = pager_ref.wal.borrow_mut(); - let result = run_checkpoint_until_done(&mut *wal, &pager_ref, CheckpointMode::Full); - assert_eq!(result.num_wal_frames, before_max); - assert_eq!(result.num_checkpointed_frames, before_max); - } - - // Validate state after FULL: max_frame unchanged; nbackfills == max_frame; not restarted. - let (after_max, after_backfills, read_mark0) = unsafe { - let s = &*wal_shared.get(); - ( - s.max_frame.load(Ordering::SeqCst), - s.nbackfills.load(Ordering::SeqCst), - s.read_locks[0].value.load(Ordering::SeqCst), - ) - }; - assert_eq!(after_max, before_max, "Full should not reset WAL sequence"); - assert_eq!(after_backfills, after_max); - assert_eq!( - read_mark0, 0, - "Restart was NOT performed, read mark 0 stays 0" - ); - let meta_after = std::fs::metadata(&walpath).unwrap(); - #[cfg(unix)] - { - let blocks_after = meta_after.blocks(); - assert_eq!( - blocks_before, blocks_after, - "WAL file should not change size after full checkpoint" - ); - } - #[cfg(not(unix))] - { - let bytes_after = meta_after.len(); - assert_eq!( - bytes_before, bytes_after, - "WAL file should not change size after full checkpoint" - ); - } - - // Append another transaction -> frame id should become previous_max + 1 - let prev_max = after_max; - conn.execute("insert into test(value) values ('after_full')") - .unwrap(); - conn.pager - .borrow_mut() - .wal - .borrow_mut() - .finish_append_frames_commit() - .unwrap(); - - let (new_max, _) = unsafe { - let s = &*wal_shared.get(); - ( - s.max_frame.load(Ordering::SeqCst), - s.nbackfills.load(Ordering::SeqCst), - ) - }; - assert_eq!( - new_max, - prev_max + 1, - "WAL should continue appending not restart" - ); - std::fs::remove_dir_all(path).unwrap(); - } - fn wal_header_snapshot(shared: &Arc>) -> (u32, u32, u32, u32) { // (checkpoint_seq, salt1, salt2, page_size) unsafe { @@ -1828,11 +1710,7 @@ pub mod test { let meta_before = std::fs::metadata(&walpath).unwrap(); - #[cfg(not(unix))] let bytes_before = meta_before.len(); - #[cfg(unix)] - let blocks_before = meta_before.blocks(); - { let pager_ref = conn.pager.borrow(); let mut wal = pager_ref.wal.borrow_mut(); @@ -1869,22 +1747,11 @@ pub mod test { assert_eq!(new_max, 1, "Sequence restarted at 1"); let meta_after = std::fs::metadata(&walpath).unwrap(); - #[cfg(unix)] - { - let blocks_after = meta_after.blocks(); - assert_eq!( - blocks_before, blocks_after, - "WAL file should not change size after full checkpoint" - ); - } - #[cfg(not(unix))] - { - let bytes_after = meta_after.len(); - assert_eq!( - bytes_before, bytes_after, - "WAL file should not change size after full checkpoint" - ); - } + let bytes_after = meta_after.len(); + assert_eq!( + bytes_before, bytes_after, + "WAL file should not change size after full checkpoint" + ); std::fs::remove_dir_all(path).unwrap(); } @@ -2015,57 +1882,6 @@ pub mod test { } } - #[test] - fn test_wal_read_lock_slot_0_optimization() { - let (db, _path) = get_database(); - let conn = db.connect().unwrap(); - - conn.execute("create table test(id integer primary key, value text)") - .unwrap(); - conn.execute("insert into test(value) values ('initial')") - .unwrap(); - - // Checkpoint everything - { - let pager = conn.pager.borrow(); - let mut wal = pager.wal.borrow_mut(); - let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full); - assert!(result.everything_backfilled()); - } - - // Now start a read transaction - should use slot 0 - let conn2 = db.connect().unwrap(); - { - let pager = conn2.pager.borrow_mut(); - let mut wal = pager.wal.borrow_mut(); - assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); - - // Verify it's using slot 0 and max_frame is 0 - assert_eq!( - wal.get_max_frame(), - 0, - "Should ignore WAL when using slot 0" - ); - assert_eq!(wal.get_min_frame(), 0); - - // Verify the read lock index - let wal_file = &*wal as *const dyn Wal as *const crate::WalFile; - let wal_file = unsafe { &*wal_file }; - assert_eq!( - wal_file.max_frame_read_lock_index, 0, - "Should be using slot 0" - ); - } - - // Another reader should also be able to use slot 0 (shared lock) - let conn3 = db.connect().unwrap(); - { - let pager = conn3.pager.borrow_mut(); - let mut wal = pager.wal.borrow_mut(); - assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); - } - } - #[test] fn test_wal_read_marks_after_restart() { let (db, _path) = get_database(); @@ -2096,9 +1912,9 @@ pub mod test { read_marks_after[1], 0, "Slot 1 (default reader) should be reset to 0" ); - for i in 2..5 { + for (i, item) in read_marks_after.iter().take(5).skip(2).enumerate() { assert_eq!( - read_marks_after[i], READMARK_NOT_USED, + *item, READMARK_NOT_USED, "Slot {i} should be READMARK_NOT_USED after restart", ); } diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index 9b1b1b56d..700fa6910 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -167,16 +167,17 @@ mod tests { SQLITE_OK ); - assert_eq!( - sqlite3_wal_checkpoint_v2( - db, - ptr::null(), - SQLITE_CHECKPOINT_FULL, - &mut log_size, - &mut checkpoint_count - ), - SQLITE_OK - ); + // TODO: uncomment when SQLITE_CHECKPOINT_FULL is supported + // assert_eq!( + // sqlite3_wal_checkpoint_v2( + // db, + // ptr::null(), + // SQLITE_CHECKPOINT_FULL, + // &mut log_size, + // &mut checkpoint_count + // ), + // SQLITE_OK + // ); assert_eq!( sqlite3_wal_checkpoint_v2( diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 310acb9ed..fe2e189dd 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -303,40 +303,6 @@ fn test_wal_checkpoint() -> anyhow::Result<()> { Ok(()) } -#[test] -#[ignore] -fn test_wal_checkpoint_truncate() -> anyhow::Result<()> { - let _ = env_logger::try_init(); - let tmp_db = - TempDatabase::new_with_rusqlite("CREATE TABLE test (x INTEGER PRIMARY KEY);", false); - let iterations = 100_usize; - let conn = tmp_db.connect_limbo(); - - for i in 0..iterations { - let insert_query = format!("INSERT INTO test VALUES ({i})"); - do_flush(&conn, &tmp_db)?; - run_query(&tmp_db, &conn, &insert_query)?; - } - conn.checkpoint(CheckpointMode::Truncate)?; - let file = PathBuf::from(format!("{}-wal", tmp_db.path.to_string_lossy())); - assert!(file.exists(), "WAL file should exist after checkpoint"); - let size = file.metadata()?.blocks(); - assert_eq!( - size, 0, - "WAL file should be truncated to a small size, found: {size}", - ); - // assert data is all in table - for i in 0..iterations { - let select_query = format!("SELECT * FROM test WHERE x = {i}"); - run_query_on_row(&tmp_db, &conn, &select_query, |row: &Row| { - let id = row.get::(0).unwrap(); - assert_eq!(i as i64, id); - })?; - } - do_flush(&conn, &tmp_db)?; - Ok(()) -} - #[test] fn test_wal_restart() -> anyhow::Result<()> { let _ = env_logger::try_init();