Finish wal transaction handling and add more wal and chkpt testing

This commit is contained in:
PThorpe92
2025-07-29 11:19:21 -04:00
committed by Jussi Saurio
parent 8806b77d26
commit 2c3a9fe5ef
7 changed files with 379 additions and 138 deletions

View File

@@ -731,7 +731,8 @@ impl turso_core::DatabaseStorage for DatabaseFile {
len: usize,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
self.file.truncate(len, c)
let c = self.file.truncate(len, c)?;
Ok(c)
}
}

View File

@@ -1,37 +0,0 @@
const fs = require('node:fs');
class VFS {
constructor() {
}
open(path, flags) {
return fs.openSync(path, flags);
}
close(fd) {
fs.closeSync(fd);
}
pread(fd, buffer, offset) {
return fs.readSync(fd, buffer, 0, buffer.length, offset);
}
pwrite(fd, buffer, offset) {
return fs.writeSync(fd, buffer, 0, buffer.length, offset);
}
size(fd) {
let stats = fs.fstatSync(fd);
return BigInt(stats.size);
}
sync(fd) {
fs.fsyncSync(fd);
}
truncate(fd, size) {
fs.ftruncateSync(fs, size)
}
}
module.exports = { VFS };

View File

@@ -1,33 +0,0 @@
export class VFS {
constructor() {
return self.vfs;
}
open(path, flags) {
return self.vfs.open(path);
}
close(fd) {
return self.vfs.close(fd);
}
pread(fd, buffer, offset) {
return self.vfs.pread(fd, buffer, offset);
}
pwrite(fd, buffer, offset) {
return self.vfs.pwrite(fd, buffer, offset);
}
size(fd) {
return self.vfs.size(fd);
}
sync(fd) {
return self.vfs.sync(fd);
}
truncate(fd, size) {
return self.vfs.truncate(fd, size)
}
}

View File

@@ -226,7 +226,7 @@ impl Clock for UringIO {
/// use the callback pointer as the user_data for the operation as is
/// common practice for io_uring to prevent more indirection
fn get_key(c: Completion) -> u64 {
Arc::into_raw(c.inner) as u64
Arc::into_raw(c.inner.clone()) as u64
}
#[inline(always)]

View File

@@ -73,7 +73,8 @@ impl DatabaseStorage for DatabaseFile {
#[instrument(skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
self.file.truncate(len, c)
let c = self.file.truncate(len, c)?;
Ok(c)
}
}
@@ -131,7 +132,8 @@ impl DatabaseStorage for FileMemoryStorage {
#[instrument(skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
self.file.truncate(len, c)
let c = self.file.truncate(len, c)?;
Ok(c)
}
}

View File

@@ -275,6 +275,9 @@ pub trait Wal {
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
fn rollback(&mut self) -> Result<()>;
#[cfg(debug_assertions)]
fn as_any(&self) -> &dyn std::any::Any;
}
/// A dummy WAL implementation that does nothing.
@@ -371,6 +374,10 @@ impl Wal for DummyWAL {
fn rollback(&mut self) -> Result<()> {
Ok(())
}
#[cfg(debug_assertions)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
// Syncing requires a state machine because we need to schedule a sync and then wait until it is
@@ -429,7 +436,6 @@ pub struct WalFile {
shared: Arc<UnsafeCell<WalFileShared>>,
ongoing_checkpoint: OngoingCheckpoint,
checkpoint_threshold: usize,
has_snapshot: Cell<bool>,
// min and max frames for this connection
/// This is the index to the read_lock in WalFileShared that we are holding. This lock contains
/// the max frame for this connection.
@@ -660,41 +666,48 @@ impl Drop for CheckpointLocks {
}
impl Wal for WalFile {
/// Begin a read transaction.
/// Begin a read transaction. The caller must ensure that there is not already
/// an ongoing read transaction.
/// sqlite/src/wal.c 3023
/// assert(pWal->readLock < 0); /* Not currently locked */
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> {
let (shared_max, nbackfills, last_checksum) = {
turso_assert!(
self.max_frame_read_lock_index.get().eq(&NO_LOCK_HELD),
"cannot start a new read tx without ending an existing one"
);
let (shared_max, nbackfills, last_checksum, checkpoint_seq) = {
let shared = self.get_shared();
let mx = shared.max_frame.load(Ordering::SeqCst);
let nb = shared.nbackfills.load(Ordering::SeqCst);
let ck = shared.last_checksum;
(mx, nb, ck)
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(mx, nb, ck, checkpoint_seq)
};
let db_changed = shared_max > self.max_frame;
// WAL is already fully backfilled into the main DB image
// (mxFrame == nBackfill). Readers can therefore ignore the
// (mxFrame == nBackfill). Readers can therefore ignore the
// WAL and fetch pages directly from the DB file. We do this
// by taking readlock 0.
// by taking readlock 0, and capturing the latest state.
if shared_max == nbackfills {
let l0 = &mut self.get_shared().read_locks[0];
if !l0.read() {
let lock_idx = 0;
if !self.get_shared().read_locks[lock_idx].read() {
return Ok((LimboResult::Busy, db_changed));
}
self.max_frame = shared_max;
// we need to keep self.max_frame set to the appropriate
// max frame in the wal at the time this transaction starts.
// but here we set min_frame=max_frame + 1 to keep an empty snapshot window,
// to demonstrate that we do not care about any frames,
// while still capturing a snapshot that we may need if we ever want to upgrade
// to a write transaction.
self.min_frame = shared_max.saturating_add(1);
self.max_frame_read_lock_index.set(0);
self.has_snapshot.set(true);
self.max_frame = shared_max;
self.max_frame_read_lock_index.set(lock_idx);
self.min_frame = nbackfills + 1;
self.last_checksum = last_checksum;
return Ok((LimboResult::Ok, db_changed));
}
// If we get this far, it means that the reader will want to use
// the WAL to get at content from recent commits. The job now is
// to select one of the aReadMark[] entries that is closest to
// but not exceeding pWal->hdr.mxFrame and lock that entry.
// Find largest mark <= mx among slots 1..N
let mut best_idx: i64 = -1;
let mut best_mark: u32 = 0;
@@ -725,25 +738,54 @@ impl Wal for WalFile {
return Ok((LimboResult::Busy, db_changed));
}
// Now take a shared read on that slot
let (min_frame, start_pages) = {
// Now take a shared read on that slot, and if we are successful,
// grab another snapshot of the shared state.
let (mx2, nb2, cksm2, start_pages, ckpt_seq2) = {
let shared = self.get_shared();
let lock = &mut shared.read_locks[best_idx as usize];
if !lock.read() {
if !shared.read_locks[best_idx as usize].read() {
// TODO: we should retry here instead of always returning Busy
return Ok((LimboResult::Busy, db_changed));
}
(
shared.nbackfills.load(Ordering::SeqCst) + 1,
shared.max_frame.load(Ordering::SeqCst),
shared.nbackfills.load(Ordering::SeqCst),
shared.last_checksum,
shared.pages_in_frames.lock().len(),
shared.wal_header.lock().checkpoint_seq,
)
};
self.min_frame = min_frame;
// sqlite/src/wal.c 3225
// Now that the read-lock has been obtained, check that neither the
// value in the aReadMark[] array or the contents of the wal-index
// header have changed.
//
// It is necessary to check that the wal-index header did not change
// between the time it was read and when the shared-lock was obtained
// on WAL_READ_LOCK(mxI) was obtained to account for the possibility
// that the log file may have been wrapped by a writer, or that frames
// that occur later in the log than pWal->hdr.mxFrame may have been
// copied into the database by a checkpointer. If either of these things
// happened, then reading the database with the current value of
// pWal->hdr.mxFrame risks reading a corrupted snapshot. So, retry
// instead.
//
// Before checking that the live wal-index header has not changed
// since it was read, set Wal.minFrame to the first frame in the wal
// file that has not yet been checkpointed. This client will not need
// to read any frames earlier than minFrame from the wal file - they
// can be safely read directly from the database file.
self.min_frame = nb2 + 1;
if mx2 != shared_max
|| nb2 != nbackfills
|| cksm2 != last_checksum
|| ckpt_seq2 != checkpoint_seq
{
return Err(LimboError::Busy);
}
self.max_frame = best_mark as u64;
self.start_pages_in_frames = start_pages;
self.max_frame_read_lock_index.set(best_idx as usize);
self.has_snapshot.set(true);
tracing::debug!(
"begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})",
self.min_frame,
@@ -751,7 +793,6 @@ impl Wal for WalFile {
best_idx,
shared_max
);
Ok((LimboResult::Ok, db_changed))
}
@@ -764,10 +805,9 @@ impl Wal for WalFile {
let rl = &mut self.get_shared().read_locks[slot];
rl.unlock();
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
self.has_snapshot.set(false);
tracing::debug!("end_read_tx(slot={slot})");
} else {
tracing::debug!("end_read_tx(no snapshot)");
tracing::debug!("end_read_tx(slot=no_lock)");
}
}
@@ -775,14 +815,18 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_write_tx(&mut self) -> Result<LimboResult> {
let shared = self.get_shared();
// sqlite/src/wal.c 3702
// Cannot start a write transaction without first holding a read
// transaction.
// assert(pWal->readLock >= 0);
// assert(pWal->writeLock == 0 && pWal->iReCksum == 0);
turso_assert!(
self.max_frame_read_lock_index.get() != NO_LOCK_HELD,
"must have a read transaction to begin a write transaction"
);
if !shared.write_lock.write() {
return Ok(LimboResult::Busy);
}
if !self.has_snapshot.get() {
// In SQLite this cannot happen (assert). Either assert or handle like Busy.
shared.write_lock.unlock();
return Ok(LimboResult::Busy);
}
let (shared_max, nbackfills, last_checksum) = {
let shared = self.get_shared();
(
@@ -818,10 +862,10 @@ impl Wal for WalFile {
return Ok(None);
}
let shared = self.get_shared();
// if we have read_lock 0, we are reading straight from the db file
let frames = shared.frame_cache.lock();
let range = self.min_frame..=self.max_frame;
if let Some(list) = frames.get(&page_id) {
if let Some(f) = list.iter().rev().find(|f| **f <= self.max_frame) {
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
return Ok(Some(*f));
}
}
@@ -1125,6 +1169,10 @@ impl Wal for WalFile {
shared.last_checksum = self.last_checksum;
Ok(())
}
#[cfg(debug_assertions)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl WalFile {
@@ -1160,7 +1208,6 @@ impl WalFile {
max_frame: 0,
current_page: 0,
},
has_snapshot: false.into(),
checkpoint_threshold: 1000,
buffer_pool,
syncing: Rc::new(Cell::new(false)),
@@ -1221,6 +1268,7 @@ impl WalFile {
/// the WAL file has been truncated and we are writing the first
/// frame since then. We need to ensure that the header is initialized.
fn ensure_header_if_needed(&mut self) -> Result<()> {
tracing::debug!("ensure_header_if_needed");
self.last_checksum = {
let shared = self.get_shared();
if shared.max_frame.load(Ordering::SeqCst) != 0 {
@@ -1286,8 +1334,7 @@ impl WalFile {
}
// acquire the appropriate exclusive locks depending on the checkpoint mode
self.acquire_proper_checkpoint_guard(mode)?;
self.ongoing_checkpoint.max_frame =
self.determine_max_safe_checkpoint_frame(mode);
self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame();
self.ongoing_checkpoint.min_frame = nbackfills + 1;
self.ongoing_checkpoint.current_page = 0;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
@@ -1297,8 +1344,7 @@ impl WalFile {
self.ongoing_checkpoint.max_frame,
);
}
// Find the next page that has a frame in the safe interval and
// schedule a read of that frame.
// Find the next page that has a frame in the safe interval and schedule a read of that frame.
CheckpointState::ReadFrame => {
let shared = self.get_shared();
let min_frame = self.ongoing_checkpoint.min_frame;
@@ -1476,38 +1522,31 @@ impl WalFile {
/// just respect the first busy slot and move on.
///
/// Locking rules:
/// This routine **tries** to take an exclusive (write) lock on each slot to
/// This routine tries to take an exclusive (write) lock on each slot to
/// update/clean it. If the try-lock fails:
/// PASSIVE: do not wait; just lower `mxSafeFrame` and break.
/// Others: lower `mxSafeFrame` and continue scanning.
///
/// We never modify slot values while a reader holds that slot.
fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> u64 {
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
let shared = self.get_shared();
let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
let this_mark = read_lock.value.load(Ordering::SeqCst);
if this_mark == READMARK_NOT_USED {
continue;
}
if this_mark < max_safe_frame as u32 {
let busy = !read_lock.write();
if !busy {
// Only adjust, never clear, in ordinary checkpoints
if read_lock_idx == 1 {
let val = if read_lock_idx == 1 {
// store the shared max_frame for the default read slot 1
read_lock
.value
.store(max_safe_frame as u32, Ordering::SeqCst);
}
max_safe_frame as u32
} else {
READMARK_NOT_USED
};
read_lock.value.store(val, Ordering::SeqCst);
read_lock.unlock();
} else {
max_safe_frame = this_mark as u64;
if matches!(mode, CheckpointMode::Passive) {
// Don't keep poking, PASSIVE can't block or spin
break;
}
}
}
}
@@ -1768,13 +1807,6 @@ impl WalFileShared {
for l in &self.read_locks[2..] {
l.value.store(READMARK_NOT_USED, Ordering::SeqCst);
}
// reset readmarks
self.read_locks[0].value.store(0, Ordering::SeqCst);
self.read_locks[1].value.store(0, Ordering::SeqCst);
for lock in &self.read_locks[2..] {
lock.value.store(READMARK_NOT_USED, Ordering::SeqCst);
}
Ok(())
}
}
@@ -1783,10 +1815,13 @@ impl WalFileShared {
pub mod test {
use crate::{
result::LimboResult,
storage::{sqlite3_ondisk::WAL_HEADER_SIZE, wal::READMARK_NOT_USED},
storage::{
sqlite3_ondisk::{self, WAL_HEADER_SIZE},
wal::READMARK_NOT_USED,
},
types::IOResult,
CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO,
Wal, WalFileShared, IO,
StepResult, Wal, WalFile, WalFileShared, IO,
};
#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
@@ -1909,7 +1944,7 @@ pub mod test {
}
#[test]
fn restart_checkpoint_resets_wal_state_and_increments_ckpt_seq() {
fn restart_checkpoint_reset_wal_state_handling() {
let (db, path) = get_database();
let walpath = {
@@ -2273,6 +2308,7 @@ pub mod test {
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
let _ = wal.begin_read_tx().unwrap();
let res = wal.begin_write_tx().unwrap();
assert!(matches!(res, LimboResult::Ok), "result: {res:?}");
}
@@ -2289,6 +2325,7 @@ pub mod test {
"Restart checkpoint should fail when write lock is held"
);
conn2.pager.borrow().wal.borrow().end_read_tx();
// release write lock
conn2.pager.borrow().wal.borrow().end_write_tx();
@@ -2301,4 +2338,272 @@ pub mod test {
assert!(result.everything_backfilled());
}
#[test]
#[should_panic(expected = "must have a read transaction to begin a write transaction")]
fn test_wal_read_transaction_required_before_write() {
let (db, _path) = get_database();
let conn = db.connect().unwrap();
conn.execute("create table test(id integer primary key, value text)")
.unwrap();
// Attempt to start a write transaction without a read transaction
let pager = conn.pager.borrow();
let mut wal = pager.wal.borrow_mut();
let _ = wal.begin_write_tx();
}
fn check_read_lock_slot(conn: &Arc<Connection>, expected_slot: usize) -> bool {
let pager = conn.pager.borrow();
let wal = pager.wal.borrow();
let wal_any = wal.as_any();
if let Some(wal_file) = wal_any.downcast_ref::<WalFile>() {
return wal_file.max_frame_read_lock_index.get() == expected_slot;
}
false
}
#[test]
fn test_wal_multiple_readers_at_different_frames() {
let (db, _path) = get_database();
let conn_writer = db.connect().unwrap();
conn_writer
.execute("CREATE TABLE test(id INTEGER PRIMARY KEY, value TEXT)")
.unwrap();
fn start_reader(conn: &Arc<Connection>) -> (u64, crate::Statement) {
conn.execute("BEGIN").unwrap();
let mut stmt = conn.prepare("SELECT * FROM test").unwrap();
stmt.step().unwrap();
let frame = conn.pager.borrow().wal.borrow().get_max_frame();
(frame, stmt)
}
bulk_inserts(&conn_writer, 3, 5);
let conn1 = &db.connect().unwrap();
let (r1_frame, _stmt) = start_reader(conn1); // reader 1
bulk_inserts(&conn_writer, 3, 5);
let conn_r2 = db.connect().unwrap();
let (r2_frame, _stmt2) = start_reader(&conn_r2); // reader 2
bulk_inserts(&conn_writer, 3, 5);
let conn_r3 = db.connect().unwrap();
let (r3_frame, _stmt3) = start_reader(&conn_r3); // reader 3
assert!(r1_frame < r2_frame && r2_frame < r3_frame);
// passive checkpoint #1
let result1 = {
let pager = conn_writer.pager.borrow();
let mut wal = pager.wal.borrow_mut();
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
};
assert_eq!(result1.num_checkpointed_frames, r1_frame);
// finish reader1
conn1.execute("COMMIT").unwrap();
// passive checkpoint #2
let result2 = {
let pager = conn_writer.pager.borrow();
let mut wal = pager.wal.borrow_mut();
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
};
assert_eq!(
result1.num_checkpointed_frames + result2.num_checkpointed_frames,
r2_frame
);
// verify visible rows
let mut stmt = conn_r2.query("SELECT COUNT(*) FROM test").unwrap().unwrap();
while !matches!(stmt.step().unwrap(), StepResult::Row) {
stmt.run_once().unwrap();
}
let r2_cnt: i64 = stmt.row().unwrap().get(0).unwrap();
let mut stmt2 = conn_r3.query("SELECT COUNT(*) FROM test").unwrap().unwrap();
while !matches!(stmt2.step().unwrap(), StepResult::Row) {
stmt2.run_once().unwrap();
}
let r3_cnt: i64 = stmt2.row().unwrap().get(0).unwrap();
assert_eq!(r2_cnt, 30);
assert_eq!(r3_cnt, 45);
}
#[test]
fn test_checkpoint_truncate_reset_handling() {
let (db, path) = get_database();
let conn = db.connect().unwrap();
let walpath = {
let mut p = path.clone().into_os_string().into_string().unwrap();
p.push_str("/test.db-wal");
std::path::PathBuf::from(p)
};
conn.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn, 10, 10);
// Get size before checkpoint
let size_before = std::fs::metadata(&walpath).unwrap().len();
assert!(size_before > 0, "WAL file should have content");
// Do a TRUNCATE checkpoint
{
let pager = conn.pager.borrow();
let mut wal = pager.wal.borrow_mut();
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Truncate);
}
// Check file size after truncate
let size_after = std::fs::metadata(&walpath).unwrap().len();
assert_eq!(size_after, 0, "WAL file should be truncated to 0 bytes");
// Verify we can still write to the database
conn.execute("INSERT INTO test VALUES (1001, 'after-truncate')")
.unwrap();
// Check WAL has new content
let new_size = std::fs::metadata(&walpath).unwrap().len();
assert!(new_size >= 32, "WAL file too small");
let hdr = read_wal_header(&walpath);
assert!(
hdr.magic == sqlite3_ondisk::WAL_MAGIC_BE,
"bad WAL magic: {:#X}",
hdr.magic
);
assert_eq!(hdr.file_format, 3007000);
assert_eq!(hdr.page_size, 4096, "invalid page size");
assert_eq!(hdr.checkpoint_seq, 0, "invalid checkpoint_seq");
std::fs::remove_dir_all(path).unwrap();
}
fn read_wal_header(path: &std::path::Path) -> sqlite3_ondisk::WalHeader {
use std::{fs::File, io::Read};
let mut hdr = [0u8; 32];
File::open(path).unwrap().read_exact(&mut hdr).unwrap();
let be = |i| u32::from_be_bytes(hdr[i..i + 4].try_into().unwrap());
sqlite3_ondisk::WalHeader {
magic: be(0x00),
file_format: be(0x04),
page_size: be(0x08),
checkpoint_seq: be(0x0C),
salt_1: be(0x10),
salt_2: be(0x14),
checksum_1: be(0x18),
checksum_2: be(0x1C),
}
}
#[test]
fn test_wal_stale_snapshot_in_write_transaction() {
let (db, _path) = get_database();
let conn1 = db.connect().unwrap();
let conn2 = db.connect().unwrap();
conn1
.execute("create table test(id integer primary key, value text)")
.unwrap();
// Start a read transaction on conn2
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
}
// Make changes using conn1
bulk_inserts(&conn1, 5, 5);
// Try to start a write transaction on conn2 with a stale snapshot
let result = {
let pager = conn2.pager.borrow();
let mut wal = pager.wal.borrow_mut();
wal.begin_write_tx()
};
// Should get Busy due to stale snapshot
assert!(matches!(result.unwrap(), LimboResult::Busy));
// End read transaction and start a fresh one
{
let pager = conn2.pager.borrow();
let mut wal = pager.wal.borrow_mut();
wal.end_read_tx();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
}
// Now write transaction should work
let result = {
let pager = conn2.pager.borrow();
let mut wal = pager.wal.borrow_mut();
wal.begin_write_tx()
};
assert!(matches!(result.unwrap(), LimboResult::Ok));
}
#[test]
fn test_wal_readlock0_optimization_behavior() {
let (db, _path) = get_database();
let conn1 = db.connect().unwrap();
let conn2 = db.connect().unwrap();
conn1
.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn1, 5, 5);
// Do a full checkpoint to move all data to DB file
{
let pager = conn1.pager.borrow();
let mut wal = pager.wal.borrow_mut();
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
}
// Start a read transaction on conn2
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
}
// should use slot 0, as everything is backfilled
assert!(check_read_lock_slot(&conn2, 0));
{
let pager = conn1.pager.borrow();
let wal = pager.wal.borrow();
let frame = wal.find_frame(5);
// since we hold readlock0, we should ignore the db file and find_frame should return none
assert!(frame.is_ok_and(|f| f.is_none()));
}
// Try checkpoint, should fail because reader has slot 0
{
let pager = conn1.pager.borrow();
let mut wal = pager.wal.borrow_mut();
let result = wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart);
assert!(
matches!(result, Err(LimboError::Busy)),
"RESTART checkpoint should fail when a reader is using slot 0"
);
}
// End the read transaction
{
let pager = conn2.pager.borrow();
let wal = pager.wal.borrow();
wal.end_read_tx();
}
{
let pager = conn1.pager.borrow();
let mut wal = pager.wal.borrow_mut();
let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
assert!(
result.everything_backfilled(),
"RESTART checkpoint should succeed after reader releases slot 0"
);
}
}
}

View File

@@ -6317,6 +6317,9 @@ pub fn op_open_ephemeral(
}
OpOpenEphemeralState::StartingTxn { pager } => {
tracing::trace!("StartingTxn");
pager
.begin_read_tx() // we have to begin a read tx before beginning a write
.expect("Failed to start read transaction");
return_if_io!(pager.begin_write_tx());
state.op_open_ephemeral_state = OpOpenEphemeralState::CreateBtree {
pager: pager.clone(),