diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 470bdfbc0..e15cdaf7f 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -731,7 +731,8 @@ impl turso_core::DatabaseStorage for DatabaseFile { len: usize, c: turso_core::Completion, ) -> turso_core::Result { - self.file.truncate(len, c) + let c = self.file.truncate(len, c)?; + Ok(c) } } diff --git a/bindings/wasm/node/src/vfs.cjs b/bindings/wasm/node/src/vfs.cjs deleted file mode 100644 index 2501c512d..000000000 --- a/bindings/wasm/node/src/vfs.cjs +++ /dev/null @@ -1,37 +0,0 @@ -const fs = require('node:fs'); - -class VFS { - constructor() { - } - - open(path, flags) { - return fs.openSync(path, flags); - } - - close(fd) { - fs.closeSync(fd); - } - - pread(fd, buffer, offset) { - return fs.readSync(fd, buffer, 0, buffer.length, offset); - } - - pwrite(fd, buffer, offset) { - return fs.writeSync(fd, buffer, 0, buffer.length, offset); - } - - size(fd) { - let stats = fs.fstatSync(fd); - return BigInt(stats.size); - } - - sync(fd) { - fs.fsyncSync(fd); - } - - truncate(fd, size) { - fs.ftruncateSync(fs, size) - } -} - -module.exports = { VFS }; diff --git a/bindings/wasm/web/src/web-vfs.js b/bindings/wasm/web/src/web-vfs.js deleted file mode 100644 index 21bdf9c0a..000000000 --- a/bindings/wasm/web/src/web-vfs.js +++ /dev/null @@ -1,33 +0,0 @@ -export class VFS { - constructor() { - return self.vfs; - } - - open(path, flags) { - return self.vfs.open(path); - } - - close(fd) { - return self.vfs.close(fd); - } - - pread(fd, buffer, offset) { - return self.vfs.pread(fd, buffer, offset); - } - - pwrite(fd, buffer, offset) { - return self.vfs.pwrite(fd, buffer, offset); - } - - size(fd) { - return self.vfs.size(fd); - } - - sync(fd) { - return self.vfs.sync(fd); - } - - truncate(fd, size) { - return self.vfs.truncate(fd, size) - } -} diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 721313df4..f33c04db3 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -226,7 +226,7 @@ impl Clock for UringIO { /// use the callback pointer as the user_data for the operation as is /// common practice for io_uring to prevent more indirection fn get_key(c: Completion) -> u64 { - Arc::into_raw(c.inner) as u64 + Arc::into_raw(c.inner.clone()) as u64 } #[inline(always)] diff --git a/core/storage/database.rs b/core/storage/database.rs index 1ff74c95e..fd2555b59 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -73,7 +73,8 @@ impl DatabaseStorage for DatabaseFile { #[instrument(skip_all, level = Level::INFO)] fn truncate(&self, len: usize, c: Completion) -> Result { - self.file.truncate(len, c) + let c = self.file.truncate(len, c)?; + Ok(c) } } @@ -131,7 +132,8 @@ impl DatabaseStorage for FileMemoryStorage { #[instrument(skip_all, level = Level::INFO)] fn truncate(&self, len: usize, c: Completion) -> Result { - self.file.truncate(len, c) + let c = self.file.truncate(len, c)?; + Ok(c) } } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d1a91fc09..853797470 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -275,6 +275,9 @@ pub trait Wal { fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; fn rollback(&mut self) -> Result<()>; + + #[cfg(debug_assertions)] + fn as_any(&self) -> &dyn std::any::Any; } /// A dummy WAL implementation that does nothing. @@ -371,6 +374,10 @@ impl Wal for DummyWAL { fn rollback(&mut self) -> Result<()> { Ok(()) } + #[cfg(debug_assertions)] + fn as_any(&self) -> &dyn std::any::Any { + self + } } // Syncing requires a state machine because we need to schedule a sync and then wait until it is @@ -429,7 +436,6 @@ pub struct WalFile { shared: Arc>, ongoing_checkpoint: OngoingCheckpoint, checkpoint_threshold: usize, - has_snapshot: Cell, // min and max frames for this connection /// This is the index to the read_lock in WalFileShared that we are holding. This lock contains /// the max frame for this connection. @@ -660,41 +666,48 @@ impl Drop for CheckpointLocks { } impl Wal for WalFile { - /// Begin a read transaction. + /// Begin a read transaction. The caller must ensure that there is not already + /// an ongoing read transaction. + /// sqlite/src/wal.c 3023 + /// assert(pWal->readLock < 0); /* Not currently locked */ #[instrument(skip_all, level = Level::DEBUG)] fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> { - let (shared_max, nbackfills, last_checksum) = { + turso_assert!( + self.max_frame_read_lock_index.get().eq(&NO_LOCK_HELD), + "cannot start a new read tx without ending an existing one" + ); + let (shared_max, nbackfills, last_checksum, checkpoint_seq) = { let shared = self.get_shared(); let mx = shared.max_frame.load(Ordering::SeqCst); let nb = shared.nbackfills.load(Ordering::SeqCst); let ck = shared.last_checksum; - (mx, nb, ck) + let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; + (mx, nb, ck, checkpoint_seq) }; let db_changed = shared_max > self.max_frame; // WAL is already fully back‑filled into the main DB image - // (mxFrame == nBackfill). Readers can therefore ignore the + // (mxFrame == nBackfill). Readers can therefore ignore the // WAL and fetch pages directly from the DB file. We do this - // by taking read‑lock 0. + // by taking read‑lock 0, and capturing the latest state. if shared_max == nbackfills { - let l0 = &mut self.get_shared().read_locks[0]; - if !l0.read() { + let lock_idx = 0; + if !self.get_shared().read_locks[lock_idx].read() { return Ok((LimboResult::Busy, db_changed)); } - self.max_frame = shared_max; // we need to keep self.max_frame set to the appropriate // max frame in the wal at the time this transaction starts. - // but here we set min_frame=max_frame + 1 to keep an empty snapshot window, - // to demonstrate that we do not care about any frames, - // while still capturing a snapshot that we may need if we ever want to upgrade - // to a write transaction. - self.min_frame = shared_max.saturating_add(1); - self.max_frame_read_lock_index.set(0); - self.has_snapshot.set(true); + self.max_frame = shared_max; + self.max_frame_read_lock_index.set(lock_idx); + self.min_frame = nbackfills + 1; self.last_checksum = last_checksum; return Ok((LimboResult::Ok, db_changed)); } + // If we get this far, it means that the reader will want to use + // the WAL to get at content from recent commits. The job now is + // to select one of the aReadMark[] entries that is closest to + // but not exceeding pWal->hdr.mxFrame and lock that entry. // Find largest mark <= mx among slots 1..N let mut best_idx: i64 = -1; let mut best_mark: u32 = 0; @@ -725,25 +738,54 @@ impl Wal for WalFile { return Ok((LimboResult::Busy, db_changed)); } - // Now take a shared read on that slot - let (min_frame, start_pages) = { + // Now take a shared read on that slot, and if we are successful, + // grab another snapshot of the shared state. + let (mx2, nb2, cksm2, start_pages, ckpt_seq2) = { let shared = self.get_shared(); - let lock = &mut shared.read_locks[best_idx as usize]; - if !lock.read() { + if !shared.read_locks[best_idx as usize].read() { + // TODO: we should retry here instead of always returning Busy return Ok((LimboResult::Busy, db_changed)); } ( - shared.nbackfills.load(Ordering::SeqCst) + 1, + shared.max_frame.load(Ordering::SeqCst), + shared.nbackfills.load(Ordering::SeqCst), + shared.last_checksum, shared.pages_in_frames.lock().len(), + shared.wal_header.lock().checkpoint_seq, ) }; - self.min_frame = min_frame; + // sqlite/src/wal.c 3225 + // Now that the read-lock has been obtained, check that neither the + // value in the aReadMark[] array or the contents of the wal-index + // header have changed. + // + // It is necessary to check that the wal-index header did not change + // between the time it was read and when the shared-lock was obtained + // on WAL_READ_LOCK(mxI) was obtained to account for the possibility + // that the log file may have been wrapped by a writer, or that frames + // that occur later in the log than pWal->hdr.mxFrame may have been + // copied into the database by a checkpointer. If either of these things + // happened, then reading the database with the current value of + // pWal->hdr.mxFrame risks reading a corrupted snapshot. So, retry + // instead. + // + // Before checking that the live wal-index header has not changed + // since it was read, set Wal.minFrame to the first frame in the wal + // file that has not yet been checkpointed. This client will not need + // to read any frames earlier than minFrame from the wal file - they + // can be safely read directly from the database file. + self.min_frame = nb2 + 1; + if mx2 != shared_max + || nb2 != nbackfills + || cksm2 != last_checksum + || ckpt_seq2 != checkpoint_seq + { + return Err(LimboError::Busy); + } self.max_frame = best_mark as u64; self.start_pages_in_frames = start_pages; self.max_frame_read_lock_index.set(best_idx as usize); - self.has_snapshot.set(true); - tracing::debug!( "begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})", self.min_frame, @@ -751,7 +793,6 @@ impl Wal for WalFile { best_idx, shared_max ); - Ok((LimboResult::Ok, db_changed)) } @@ -764,10 +805,9 @@ impl Wal for WalFile { let rl = &mut self.get_shared().read_locks[slot]; rl.unlock(); self.max_frame_read_lock_index.set(NO_LOCK_HELD); - self.has_snapshot.set(false); tracing::debug!("end_read_tx(slot={slot})"); } else { - tracing::debug!("end_read_tx(no snapshot)"); + tracing::debug!("end_read_tx(slot=no_lock)"); } } @@ -775,14 +815,18 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn begin_write_tx(&mut self) -> Result { let shared = self.get_shared(); + // sqlite/src/wal.c 3702 + // Cannot start a write transaction without first holding a read + // transaction. + // assert(pWal->readLock >= 0); + // assert(pWal->writeLock == 0 && pWal->iReCksum == 0); + turso_assert!( + self.max_frame_read_lock_index.get() != NO_LOCK_HELD, + "must have a read transaction to begin a write transaction" + ); if !shared.write_lock.write() { return Ok(LimboResult::Busy); } - if !self.has_snapshot.get() { - // In SQLite this cannot happen (assert). Either assert or handle like Busy. - shared.write_lock.unlock(); - return Ok(LimboResult::Busy); - } let (shared_max, nbackfills, last_checksum) = { let shared = self.get_shared(); ( @@ -818,10 +862,10 @@ impl Wal for WalFile { return Ok(None); } let shared = self.get_shared(); - // if we have read_lock 0, we are reading straight from the db file let frames = shared.frame_cache.lock(); + let range = self.min_frame..=self.max_frame; if let Some(list) = frames.get(&page_id) { - if let Some(f) = list.iter().rev().find(|f| **f <= self.max_frame) { + if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) { return Ok(Some(*f)); } } @@ -1125,6 +1169,10 @@ impl Wal for WalFile { shared.last_checksum = self.last_checksum; Ok(()) } + #[cfg(debug_assertions)] + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl WalFile { @@ -1160,7 +1208,6 @@ impl WalFile { max_frame: 0, current_page: 0, }, - has_snapshot: false.into(), checkpoint_threshold: 1000, buffer_pool, syncing: Rc::new(Cell::new(false)), @@ -1221,6 +1268,7 @@ impl WalFile { /// the WAL file has been truncated and we are writing the first /// frame since then. We need to ensure that the header is initialized. fn ensure_header_if_needed(&mut self) -> Result<()> { + tracing::debug!("ensure_header_if_needed"); self.last_checksum = { let shared = self.get_shared(); if shared.max_frame.load(Ordering::SeqCst) != 0 { @@ -1286,8 +1334,7 @@ impl WalFile { } // acquire the appropriate exclusive locks depending on the checkpoint mode self.acquire_proper_checkpoint_guard(mode)?; - self.ongoing_checkpoint.max_frame = - self.determine_max_safe_checkpoint_frame(mode); + self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); self.ongoing_checkpoint.min_frame = nbackfills + 1; self.ongoing_checkpoint.current_page = 0; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; @@ -1297,8 +1344,7 @@ impl WalFile { self.ongoing_checkpoint.max_frame, ); } - // Find the next page that has a frame in the safe interval and - // schedule a read of that frame. + // Find the next page that has a frame in the safe interval and schedule a read of that frame. CheckpointState::ReadFrame => { let shared = self.get_shared(); let min_frame = self.ongoing_checkpoint.min_frame; @@ -1476,38 +1522,31 @@ impl WalFile { /// just respect the first busy slot and move on. /// /// Locking rules: - /// This routine **tries** to take an exclusive (write) lock on each slot to + /// This routine tries to take an exclusive (write) lock on each slot to /// update/clean it. If the try-lock fails: /// PASSIVE: do not wait; just lower `mxSafeFrame` and break. /// Others: lower `mxSafeFrame` and continue scanning. /// /// We never modify slot values while a reader holds that slot. - fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> u64 { + fn determine_max_safe_checkpoint_frame(&self) -> u64 { let shared = self.get_shared(); let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst); for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { let this_mark = read_lock.value.load(Ordering::SeqCst); - if this_mark == READMARK_NOT_USED { - continue; - } if this_mark < max_safe_frame as u32 { let busy = !read_lock.write(); if !busy { - // Only adjust, never clear, in ordinary checkpoints - if read_lock_idx == 1 { + let val = if read_lock_idx == 1 { // store the shared max_frame for the default read slot 1 - read_lock - .value - .store(max_safe_frame as u32, Ordering::SeqCst); - } + max_safe_frame as u32 + } else { + READMARK_NOT_USED + }; + read_lock.value.store(val, Ordering::SeqCst); read_lock.unlock(); } else { max_safe_frame = this_mark as u64; - if matches!(mode, CheckpointMode::Passive) { - // Don't keep poking, PASSIVE can't block or spin - break; - } } } } @@ -1768,13 +1807,6 @@ impl WalFileShared { for l in &self.read_locks[2..] { l.value.store(READMARK_NOT_USED, Ordering::SeqCst); } - - // reset read‑marks - self.read_locks[0].value.store(0, Ordering::SeqCst); - self.read_locks[1].value.store(0, Ordering::SeqCst); - for lock in &self.read_locks[2..] { - lock.value.store(READMARK_NOT_USED, Ordering::SeqCst); - } Ok(()) } } @@ -1783,10 +1815,13 @@ impl WalFileShared { pub mod test { use crate::{ result::LimboResult, - storage::{sqlite3_ondisk::WAL_HEADER_SIZE, wal::READMARK_NOT_USED}, + storage::{ + sqlite3_ondisk::{self, WAL_HEADER_SIZE}, + wal::READMARK_NOT_USED, + }, types::IOResult, CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO, - Wal, WalFileShared, IO, + StepResult, Wal, WalFile, WalFileShared, IO, }; #[cfg(unix)] use std::os::unix::fs::MetadataExt; @@ -1909,7 +1944,7 @@ pub mod test { } #[test] - fn restart_checkpoint_resets_wal_state_and_increments_ckpt_seq() { + fn restart_checkpoint_reset_wal_state_handling() { let (db, path) = get_database(); let walpath = { @@ -2273,6 +2308,7 @@ pub mod test { { let pager = conn2.pager.borrow_mut(); let mut wal = pager.wal.borrow_mut(); + let _ = wal.begin_read_tx().unwrap(); let res = wal.begin_write_tx().unwrap(); assert!(matches!(res, LimboResult::Ok), "result: {res:?}"); } @@ -2289,6 +2325,7 @@ pub mod test { "Restart checkpoint should fail when write lock is held" ); + conn2.pager.borrow().wal.borrow().end_read_tx(); // release write lock conn2.pager.borrow().wal.borrow().end_write_tx(); @@ -2301,4 +2338,272 @@ pub mod test { assert!(result.everything_backfilled()); } + + #[test] + #[should_panic(expected = "must have a read transaction to begin a write transaction")] + fn test_wal_read_transaction_required_before_write() { + let (db, _path) = get_database(); + let conn = db.connect().unwrap(); + + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + + // Attempt to start a write transaction without a read transaction + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let _ = wal.begin_write_tx(); + } + + fn check_read_lock_slot(conn: &Arc, expected_slot: usize) -> bool { + let pager = conn.pager.borrow(); + let wal = pager.wal.borrow(); + let wal_any = wal.as_any(); + if let Some(wal_file) = wal_any.downcast_ref::() { + return wal_file.max_frame_read_lock_index.get() == expected_slot; + } + false + } + + #[test] + fn test_wal_multiple_readers_at_different_frames() { + let (db, _path) = get_database(); + let conn_writer = db.connect().unwrap(); + + conn_writer + .execute("CREATE TABLE test(id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + fn start_reader(conn: &Arc) -> (u64, crate::Statement) { + conn.execute("BEGIN").unwrap(); + let mut stmt = conn.prepare("SELECT * FROM test").unwrap(); + stmt.step().unwrap(); + let frame = conn.pager.borrow().wal.borrow().get_max_frame(); + (frame, stmt) + } + + bulk_inserts(&conn_writer, 3, 5); + + let conn1 = &db.connect().unwrap(); + let (r1_frame, _stmt) = start_reader(conn1); // reader 1 + + bulk_inserts(&conn_writer, 3, 5); + + let conn_r2 = db.connect().unwrap(); + let (r2_frame, _stmt2) = start_reader(&conn_r2); // reader 2 + + bulk_inserts(&conn_writer, 3, 5); + + let conn_r3 = db.connect().unwrap(); + let (r3_frame, _stmt3) = start_reader(&conn_r3); // reader 3 + + assert!(r1_frame < r2_frame && r2_frame < r3_frame); + + // passive checkpoint #1 + let result1 = { + let pager = conn_writer.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive) + }; + assert_eq!(result1.num_checkpointed_frames, r1_frame); + + // finish reader‑1 + conn1.execute("COMMIT").unwrap(); + + // passive checkpoint #2 + let result2 = { + let pager = conn_writer.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive) + }; + assert_eq!( + result1.num_checkpointed_frames + result2.num_checkpointed_frames, + r2_frame + ); + + // verify visible rows + let mut stmt = conn_r2.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + while !matches!(stmt.step().unwrap(), StepResult::Row) { + stmt.run_once().unwrap(); + } + let r2_cnt: i64 = stmt.row().unwrap().get(0).unwrap(); + + let mut stmt2 = conn_r3.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + while !matches!(stmt2.step().unwrap(), StepResult::Row) { + stmt2.run_once().unwrap(); + } + let r3_cnt: i64 = stmt2.row().unwrap().get(0).unwrap(); + assert_eq!(r2_cnt, 30); + assert_eq!(r3_cnt, 45); + } + + #[test] + fn test_checkpoint_truncate_reset_handling() { + let (db, path) = get_database(); + let conn = db.connect().unwrap(); + + let walpath = { + let mut p = path.clone().into_os_string().into_string().unwrap(); + p.push_str("/test.db-wal"); + std::path::PathBuf::from(p) + }; + + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn, 10, 10); + + // Get size before checkpoint + let size_before = std::fs::metadata(&walpath).unwrap().len(); + assert!(size_before > 0, "WAL file should have content"); + + // Do a TRUNCATE checkpoint + { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Truncate); + } + + // Check file size after truncate + let size_after = std::fs::metadata(&walpath).unwrap().len(); + assert_eq!(size_after, 0, "WAL file should be truncated to 0 bytes"); + + // Verify we can still write to the database + conn.execute("INSERT INTO test VALUES (1001, 'after-truncate')") + .unwrap(); + + // Check WAL has new content + let new_size = std::fs::metadata(&walpath).unwrap().len(); + assert!(new_size >= 32, "WAL file too small"); + let hdr = read_wal_header(&walpath); + assert!( + hdr.magic == sqlite3_ondisk::WAL_MAGIC_BE, + "bad WAL magic: {:#X}", + hdr.magic + ); + assert_eq!(hdr.file_format, 3007000); + assert_eq!(hdr.page_size, 4096, "invalid page size"); + assert_eq!(hdr.checkpoint_seq, 0, "invalid checkpoint_seq"); + std::fs::remove_dir_all(path).unwrap(); + } + + fn read_wal_header(path: &std::path::Path) -> sqlite3_ondisk::WalHeader { + use std::{fs::File, io::Read}; + let mut hdr = [0u8; 32]; + File::open(path).unwrap().read_exact(&mut hdr).unwrap(); + let be = |i| u32::from_be_bytes(hdr[i..i + 4].try_into().unwrap()); + sqlite3_ondisk::WalHeader { + magic: be(0x00), + file_format: be(0x04), + page_size: be(0x08), + checkpoint_seq: be(0x0C), + salt_1: be(0x10), + salt_2: be(0x14), + checksum_1: be(0x18), + checksum_2: be(0x1C), + } + } + + #[test] + fn test_wal_stale_snapshot_in_write_transaction() { + let (db, _path) = get_database(); + let conn1 = db.connect().unwrap(); + let conn2 = db.connect().unwrap(); + + conn1 + .execute("create table test(id integer primary key, value text)") + .unwrap(); + // Start a read transaction on conn2 + { + let pager = conn2.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + let (res, _) = wal.begin_read_tx().unwrap(); + assert!(matches!(res, LimboResult::Ok)); + } + // Make changes using conn1 + bulk_inserts(&conn1, 5, 5); + // Try to start a write transaction on conn2 with a stale snapshot + let result = { + let pager = conn2.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + wal.begin_write_tx() + }; + // Should get Busy due to stale snapshot + assert!(matches!(result.unwrap(), LimboResult::Busy)); + + // End read transaction and start a fresh one + { + let pager = conn2.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + wal.end_read_tx(); + let (res, _) = wal.begin_read_tx().unwrap(); + assert!(matches!(res, LimboResult::Ok)); + } + // Now write transaction should work + let result = { + let pager = conn2.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + wal.begin_write_tx() + }; + assert!(matches!(result.unwrap(), LimboResult::Ok)); + } + + #[test] + fn test_wal_readlock0_optimization_behavior() { + let (db, _path) = get_database(); + let conn1 = db.connect().unwrap(); + let conn2 = db.connect().unwrap(); + + conn1 + .execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn1, 5, 5); + // Do a full checkpoint to move all data to DB file + { + let pager = conn1.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive); + } + + // Start a read transaction on conn2 + { + let pager = conn2.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + let (res, _) = wal.begin_read_tx().unwrap(); + assert!(matches!(res, LimboResult::Ok)); + } + // should use slot 0, as everything is backfilled + assert!(check_read_lock_slot(&conn2, 0)); + { + let pager = conn1.pager.borrow(); + let wal = pager.wal.borrow(); + let frame = wal.find_frame(5); + // since we hold readlock0, we should ignore the db file and find_frame should return none + assert!(frame.is_ok_and(|f| f.is_none())); + } + // Try checkpoint, should fail because reader has slot 0 + { + let pager = conn1.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let result = wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart); + + assert!( + matches!(result, Err(LimboError::Busy)), + "RESTART checkpoint should fail when a reader is using slot 0" + ); + } + // End the read transaction + { + let pager = conn2.pager.borrow(); + let wal = pager.wal.borrow(); + wal.end_read_tx(); + } + { + let pager = conn1.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart); + assert!( + result.everything_backfilled(), + "RESTART checkpoint should succeed after reader releases slot 0" + ); + } + } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 85ed17698..8d0fbaea8 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -6317,6 +6317,9 @@ pub fn op_open_ephemeral( } OpOpenEphemeralState::StartingTxn { pager } => { tracing::trace!("StartingTxn"); + pager + .begin_read_tx() // we have to begin a read tx before beginning a write + .expect("Failed to start read transaction"); return_if_io!(pager.begin_write_tx()); state.op_open_ephemeral_state = OpOpenEphemeralState::CreateBtree { pager: pager.clone(),