diff --git a/core/storage/wal.rs b/core/storage/wal.rs index a5ad02e51..f95c88b3f 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -2842,4 +2842,117 @@ pub mod test { ); } } + + #[test] + fn test_wal_full_backfills_all() { + let (db, _tmp) = get_database(); + let conn = db.connect().unwrap(); + + // Write some data to put frames in the WAL + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn, 8, 4); + + // Ensure frames are flushed to the WAL + while let IOResult::IO = conn.pager.borrow_mut().cacheflush().unwrap() { + conn.run_once().unwrap(); + } + + // Snapshot the current mxFrame before running FULL + let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); + let mx_before = unsafe { (&*wal_shared.get()).max_frame.load(Ordering::SeqCst) }; + assert!(mx_before > 0, "expected frames in WAL before FULL"); + + // Run FULL checkpoint — must backfill *all* frames up to mx_before + let result = { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full) + }; + + assert_eq!(result.num_wal_frames, mx_before); + assert_eq!(result.num_checkpointed_frames, mx_before); + + // If your caller does truncation/sync after a full backfill, it may keep a guard. + // It's fine to release here to avoid holding locks past the test. + let mut result = result; + result.release_guard(); + } + + #[test] + fn test_wal_full_waits_for_old_reader_then_succeeds() { + let (db, _tmp) = get_database(); + let writer = db.connect().unwrap(); + let reader = db.connect().unwrap(); + + writer + .execute("create table test(id integer primary key, value text)") + .unwrap(); + + // First commit some data and flush (reader will snapshot here) + bulk_inserts(&writer, 2, 3); + while let IOResult::IO = writer.pager.borrow_mut().cacheflush().unwrap() { + writer.run_once().unwrap(); + } + + // Start a read transaction pinned at the current snapshot + { + let pager = reader.pager.borrow_mut(); + let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); + let (res, _) = wal.begin_read_tx().unwrap(); + assert!(matches!(res, LimboResult::Ok)); + } + let r_snapshot = { + let pager = reader.pager.borrow(); + let wal = pager.wal.as_ref().unwrap().borrow(); + wal.get_max_frame() + }; + + // Advance WAL beyond the reader's snapshot + bulk_inserts(&writer, 3, 4); + while let IOResult::IO = writer.pager.borrow_mut().cacheflush().unwrap() { + writer.run_once().unwrap(); + } + let mx_now = unsafe { + (&*db.maybe_shared_wal.read().as_ref().unwrap().get()) + .max_frame + .load(Ordering::SeqCst) + }; + assert!(mx_now > r_snapshot); + + // FULL must return Busy while a reader is stuck behind + { + let pager = writer.pager.borrow(); + let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); + match wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Full) { + Ok(IOResult::IO) => { + // Drive any pending IO (should quickly become Busy or Done) + writer.run_once().unwrap(); + // Call again to see final state + match wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Full) { + Err(LimboError::Busy) => {} + other => panic!("expected Busy from FULL with old reader, got {other:?}"), + } + } + Err(LimboError::Busy) => {} + other => panic!("expected Busy from FULL with old reader, got {other:?}"), + } + } + + // Release the reader, now full mode should succeed and backfill everything + { + let pager = reader.pager.borrow(); + let wal = pager.wal.as_ref().unwrap().borrow(); + wal.end_read_tx(); + } + + let result = { + let pager = writer.pager.borrow(); + let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full) + }; + + assert_eq!(result.num_wal_frames, mx_now); + assert_eq!(result.num_checkpointed_frames, mx_now); + } }