Merge 'WAL txn: fix reads from DB file' from Nikita Sivukhin

- Transaction which was started with max_frame = 0 and
max_frame_read_lock_index = 0 can write to the WAL and in this case it
needs to read data back from WAL and not the DB file.
- Without cache spilling its hard to reproduce this issue for the turso-
db now, but I found this issue with sync-engine which do weird stuff
with the WAL which "simulates" cache spilling behaviour to some extent.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>
Reviewed-by: Preston Thorpe <preston@turso.tech>

Closes #2735
This commit is contained in:
Preston Thorpe
2025-08-25 08:34:17 -04:00
committed by GitHub
4 changed files with 99 additions and 22 deletions

View File

@@ -1416,22 +1416,25 @@ impl Pager {
"wal_insert_frame() called on database without WAL".to_string(),
));
};
let mut wal = wal.borrow_mut();
let (header, raw_page) = parse_wal_frame_header(frame);
wal.write_frame_raw(
self.buffer_pool.clone(),
frame_no,
header.page_number as u64,
header.db_size as u64,
raw_page,
)?;
if let Some(page) = self.cache_get(header.page_number as usize) {
let content = page.get_contents();
content.as_ptr().copy_from_slice(raw_page);
turso_assert!(
page.get().id == header.page_number as usize,
"page has unexpected id"
);
{
let mut wal = wal.borrow_mut();
wal.write_frame_raw(
self.buffer_pool.clone(),
frame_no,
header.page_number as u64,
header.db_size as u64,
raw_page,
)?;
if let Some(page) = self.cache_get(header.page_number as usize) {
let content = page.get_contents();
content.as_ptr().copy_from_slice(raw_page);
turso_assert!(
page.get().id == header.page_number as usize,
"page has unexpected id"
);
}
}
if header.page_number == 1 {
let db_size = self

View File

@@ -814,14 +814,14 @@ impl Wal for WalFile {
// WAL and fetch pages directly from the DB file. We do this
// by taking readlock 0, and capturing the latest state.
if shared_max == nbackfills {
let lock_idx = 0;
if !self.get_shared().read_locks[lock_idx].read() {
let lock_0_idx = 0;
if !self.get_shared().read_locks[lock_0_idx].read() {
return Ok((LimboResult::Busy, db_changed));
}
// we need to keep self.max_frame set to the appropriate
// max frame in the wal at the time this transaction starts.
self.max_frame = shared_max;
self.max_frame_read_lock_index.set(lock_idx);
self.max_frame_read_lock_index.set(lock_0_idx);
self.min_frame = nbackfills + 1;
self.last_checksum = last_checksum;
return Ok((LimboResult::Ok, db_changed));
@@ -964,7 +964,7 @@ impl Wal for WalFile {
}
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch");
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max);
shared.write_lock.unlock();
Ok(LimboResult::Busy)
}
@@ -999,8 +999,18 @@ impl Wal for WalFile {
"frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire)
);
// if we are holding read_lock 0, skip and read right from db file.
if self.max_frame_read_lock_index.get() == 0 {
// if we are holding read_lock 0 and didn't write anything to the WAL, skip and read right from db file.
//
// note, that max_frame_read_lock_index is set to 0 only when shared_max_frame == nbackfill in which case
// min_frame is set to nbackfill + 1 and max_frame is set to shared_max_frame
//
// by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db
if self.max_frame_read_lock_index.get() == 0 && self.max_frame < self.min_frame {
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file",
page_id,
frame_watermark,
);
return Ok(None);
}
let shared = self.get_shared();
@@ -1008,8 +1018,21 @@ impl Wal for WalFile {
let range = frame_watermark
.map(|x| 0..=x)
.unwrap_or(self.min_frame..=self.max_frame);
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}",
page_id,
frame_watermark,
self.min_frame,
self.max_frame
);
if let Some(list) = frames.get(&page_id) {
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): found frame={}",
page_id,
frame_watermark,
*f
);
return Ok(Some(*f));
}
}

View File

@@ -926,3 +926,53 @@ fn test_db_share_same_file() {
]]
);
}
#[test]
fn test_wal_api_simulate_spilled_frames() {
let (mut rng, _) = rng_from_time();
let db1 = TempDatabase::new_empty(false);
let conn1 = db1.connect_limbo();
let db2 = TempDatabase::new_empty(false);
let conn2 = db2.connect_limbo();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn2
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
let watermark = conn1.wal_state().unwrap().max_frame;
for _ in 0..128 {
let key = rng.next_u32();
let length = rng.next_u32() % 4096 + 1;
conn1
.execute(format!(
"INSERT INTO t VALUES ({key}, randomblob({length}))"
))
.unwrap();
}
let mut frame = [0u8; 24 + 4096];
conn2
.checkpoint(CheckpointMode::Truncate {
upper_bound_inclusive: None,
})
.unwrap();
conn2.wal_insert_begin().unwrap();
let frames_count = conn1.wal_state().unwrap().max_frame;
for frame_id in watermark + 1..=frames_count {
let mut info = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
info.db_size = 0;
info.put_to_frame_header(&mut frame);
conn2
.wal_insert_frame(frame_id - watermark, &frame)
.unwrap();
}
for _ in 0..128 {
let key = rng.next_u32();
let length = rng.next_u32() % 4096 + 1;
conn2
.execute(format!(
"INSERT INTO t VALUES ({key}, randomblob({length}))"
))
.unwrap();
}
}

View File

@@ -284,6 +284,7 @@ fn test_wal_checkpoint() -> anyhow::Result<()> {
let conn = tmp_db.connect_limbo();
for i in 0..iterations {
log::info!("iteration #{i}");
let insert_query = format!("INSERT INTO test VALUES ({i})");
do_flush(&conn, &tmp_db)?;
conn.checkpoint(CheckpointMode::Passive {
@@ -823,7 +824,7 @@ pub fn run_query_core(
on_row(row)
}
}
_ => unreachable!(),
r => panic!("unexpected step result: {r:?}"),
}
}
};