From c62b87d9b6b0583bfd5435110c0d340cd5219dcb Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 22 Aug 2025 17:16:39 +0400 Subject: [PATCH] read from database file only if max_frame_read_lock_index is 0 and max_frame > min_frame - transaction which was started with max_frame = 0 and max_frame_read_lock_index = 0 can write to the WAL and in this case it needs to read data back from WAL - without cache spilling its hard to reproduce this issue for the turso-db now, but I stumbled into this issue with sync-engine which do weird stuff with the WAL which "simulates" cache spilling behaviour to some extent --- core/storage/pager.rs | 33 ++++++------ core/storage/wal.rs | 35 ++++++++++--- tests/integration/functions/test_wal_api.rs | 50 +++++++++++++++++++ .../query_processing/test_write_path.rs | 3 +- 4 files changed, 99 insertions(+), 22 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index c718485c1..9a98d30d7 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1416,22 +1416,25 @@ impl Pager { "wal_insert_frame() called on database without WAL".to_string(), )); }; - let mut wal = wal.borrow_mut(); let (header, raw_page) = parse_wal_frame_header(frame); - wal.write_frame_raw( - self.buffer_pool.clone(), - frame_no, - header.page_number as u64, - header.db_size as u64, - raw_page, - )?; - if let Some(page) = self.cache_get(header.page_number as usize) { - let content = page.get_contents(); - content.as_ptr().copy_from_slice(raw_page); - turso_assert!( - page.get().id == header.page_number as usize, - "page has unexpected id" - ); + + { + let mut wal = wal.borrow_mut(); + wal.write_frame_raw( + self.buffer_pool.clone(), + frame_no, + header.page_number as u64, + header.db_size as u64, + raw_page, + )?; + if let Some(page) = self.cache_get(header.page_number as usize) { + let content = page.get_contents(); + content.as_ptr().copy_from_slice(raw_page); + turso_assert!( + page.get().id == header.page_number as usize, + "page has unexpected id" + ); + } } if header.page_number == 1 { let db_size = self diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 35882bf7d..a875af365 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -815,14 +815,14 @@ impl Wal for WalFile { // WAL and fetch pages directly from the DB file. We do this // by taking read‑lock 0, and capturing the latest state. if shared_max == nbackfills { - let lock_idx = 0; - if !self.get_shared().read_locks[lock_idx].read() { + let lock_0_idx = 0; + if !self.get_shared().read_locks[lock_0_idx].read() { return Ok((LimboResult::Busy, db_changed)); } // we need to keep self.max_frame set to the appropriate // max frame in the wal at the time this transaction starts. self.max_frame = shared_max; - self.max_frame_read_lock_index.set(lock_idx); + self.max_frame_read_lock_index.set(lock_0_idx); self.min_frame = nbackfills + 1; self.last_checksum = last_checksum; return Ok((LimboResult::Ok, db_changed)); @@ -965,7 +965,7 @@ impl Wal for WalFile { } // Snapshot is stale, give up and let caller retry from scratch - tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch"); + tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max); shared.write_lock.unlock(); Ok(LimboResult::Busy) } @@ -1000,8 +1000,18 @@ impl Wal for WalFile { "frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire) ); - // if we are holding read_lock 0, skip and read right from db file. - if self.max_frame_read_lock_index.get() == 0 { + // if we are holding read_lock 0 and didn't write anything to the WAL, skip and read right from db file. + // + // note, that max_frame_read_lock_index is set to 0 only when shared_max_frame == nbackfill in which case + // min_frame is set to nbackfill + 1 and max_frame is set to shared_max_frame + // + // by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db + if self.max_frame_read_lock_index.get() == 0 && self.max_frame < self.min_frame { + tracing::debug!( + "find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file", + page_id, + frame_watermark, + ); return Ok(None); } let shared = self.get_shared(); @@ -1009,8 +1019,21 @@ impl Wal for WalFile { let range = frame_watermark .map(|x| 0..=x) .unwrap_or(self.min_frame..=self.max_frame); + tracing::debug!( + "find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}", + page_id, + frame_watermark, + self.min_frame, + self.max_frame + ); if let Some(list) = frames.get(&page_id) { if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) { + tracing::debug!( + "find_frame(page_id={}, frame_watermark={:?}): found frame={}", + page_id, + frame_watermark, + *f + ); return Ok(Some(*f)); } } diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index 38537000a..069b0b084 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -926,3 +926,53 @@ fn test_db_share_same_file() { ]] ); } + +#[test] +fn test_wal_api_simulate_spilled_frames() { + let (mut rng, _) = rng_from_time(); + let db1 = TempDatabase::new_empty(false); + let conn1 = db1.connect_limbo(); + let db2 = TempDatabase::new_empty(false); + let conn2 = db2.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn2 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + let watermark = conn1.wal_state().unwrap().max_frame; + for _ in 0..128 { + let key = rng.next_u32(); + let length = rng.next_u32() % 4096 + 1; + conn1 + .execute(format!( + "INSERT INTO t VALUES ({key}, randomblob({length}))" + )) + .unwrap(); + } + let mut frame = [0u8; 24 + 4096]; + conn2 + .checkpoint(CheckpointMode::Truncate { + upper_bound_inclusive: None, + }) + .unwrap(); + conn2.wal_insert_begin().unwrap(); + let frames_count = conn1.wal_state().unwrap().max_frame; + for frame_id in watermark + 1..=frames_count { + let mut info = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); + info.db_size = 0; + info.put_to_frame_header(&mut frame); + conn2 + .wal_insert_frame(frame_id - watermark, &frame) + .unwrap(); + } + for _ in 0..128 { + let key = rng.next_u32(); + let length = rng.next_u32() % 4096 + 1; + conn2 + .execute(format!( + "INSERT INTO t VALUES ({key}, randomblob({length}))" + )) + .unwrap(); + } +} diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index e2cbfa06e..096327db1 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -284,6 +284,7 @@ fn test_wal_checkpoint() -> anyhow::Result<()> { let conn = tmp_db.connect_limbo(); for i in 0..iterations { + log::info!("iteration #{}", i); let insert_query = format!("INSERT INTO test VALUES ({i})"); do_flush(&conn, &tmp_db)?; conn.checkpoint(CheckpointMode::Passive { @@ -823,7 +824,7 @@ pub fn run_query_core( on_row(row) } } - _ => unreachable!(), + r => panic!("unexpected step result: {r:?}"), } } };