From 435ca7fe7a4026a010f9f429b89ecb253bdce477 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 23 Jul 2025 21:05:53 +0400 Subject: [PATCH 1/5] add fuzz tests for raw WAL API --- tests/integration/common.rs | 12 +++- tests/integration/functions/test_wal_api.rs | 64 ++++++++++++++++++++- tests/integration/fuzz/mod.rs | 11 +--- 3 files changed, 75 insertions(+), 12 deletions(-) diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 3c0816b14..05207490f 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -1,4 +1,5 @@ -use rand::{rng, RngCore}; +use rand::{rng, RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; use rusqlite::params; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -238,6 +239,15 @@ pub(crate) fn limbo_exec_rows_error( } } +pub(crate) fn rng_from_time() -> (ChaCha8Rng, u64) { + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let rng = ChaCha8Rng::seed_from_u64(seed); + (rng, seed) +} + #[cfg(test)] mod tests { use std::vec; diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index edef8569d..48bbfdf02 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -1,6 +1,8 @@ +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; use rusqlite::types::Value; -use crate::common::{limbo_exec_rows, TempDatabase}; +use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase}; #[test] fn test_wal_frame_count() { @@ -142,3 +144,63 @@ fn test_wal_frame_far_away_write() { db1.io.wait_for_completion(c).unwrap(); assert!(conn2.wal_insert_frame(5, &frame).is_err()); } + +#[test] +fn test_wal_frame_api_no_schema_changes_fuzz() { + let (mut rng, _) = rng_from_time(); + for _ in 0..4 { + let db1 = TempDatabase::new_empty(false); + let conn1 = db1.connect_limbo(); + let db2 = TempDatabase::new_empty(false); + let conn2 = db2.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn2 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + + let seed = rng.next_u64(); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + println!("SEED: {}", seed); + + let (mut size, mut synced_frame) = (0, conn2.wal_frame_count().unwrap()); + let mut commit_frames = vec![conn1.wal_frame_count().unwrap()]; + for _ in 0..256 { + if rng.next_u32() % 10 != 0 { + let key = rng.next_u32(); + let length = rng.next_u32() % (4 * 4096); + let query = format!("INSERT INTO t VALUES ({}, randomblob({}))", key, length); + // println!("{}", query); + conn1.execute(&query).unwrap(); + commit_frames.push(conn1.wal_frame_count().unwrap()); + } else { + let last_frame = conn1.wal_frame_count().unwrap(); + let next_frame = + synced_frame + (rng.next_u32() as u64 % (last_frame - synced_frame + 1)); + let mut frame = [0u8; 24 + 4096]; + // println!("sync WAL frames: [{}..{}]", synced_frame + 1, next_frame); + conn2.wal_insert_begin().unwrap(); + for frame_no in (synced_frame + 1)..=next_frame { + let c = conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); + db1.io.wait_for_completion(c).unwrap(); + conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap(); + } + conn2.wal_insert_end().unwrap(); + for (i, committed) in commit_frames.iter().enumerate() { + if *committed <= next_frame { + size = size.max(i); + synced_frame = *committed; + } + } + if rng.next_u32() % 10 == 0 { + synced_frame = rng.next_u32() as u64 % synced_frame; + } + assert_eq!( + limbo_exec_rows(&db2, &conn2, "SELECT COUNT(*) FROM t"), + vec![vec![Value::Integer(size as i64)]] + ); + } + } + } +} diff --git a/tests/integration/fuzz/mod.rs b/tests/integration/fuzz/mod.rs index 2798c8006..e56d7db64 100644 --- a/tests/integration/fuzz/mod.rs +++ b/tests/integration/fuzz/mod.rs @@ -10,21 +10,12 @@ mod tests { use rusqlite::params; use crate::{ - common::{limbo_exec_rows, sqlite_exec_rows, TempDatabase}, + common::{limbo_exec_rows, rng_from_time, sqlite_exec_rows, TempDatabase}, fuzz::grammar_generator::{const_str, rand_int, rand_str, GrammarGenerator}, }; use super::grammar_generator::SymbolHandle; - fn rng_from_time() -> (ChaCha8Rng, u64) { - let seed = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - let rng = ChaCha8Rng::seed_from_u64(seed); - (rng, seed) - } - /// [See this issue for more info](https://github.com/tursodatabase/turso/issues/1763) #[test] pub fn fuzz_failure_issue_1763() { From 4a8030670583e50bbf05972cd6b9684bee515d2d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 23 Jul 2025 21:09:09 +0400 Subject: [PATCH 2/5] fix wal insert frame raw API - we need to properly mark pages as dirty after insertion --- core/lib.rs | 10 ++++---- core/storage/pager.rs | 28 +++++++++++++++++++++-- core/storage/sqlite3_ondisk.rs | 8 ++++--- core/storage/wal.rs | 42 +++++++++++++++++++--------------- 4 files changed, 59 insertions(+), 29 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index a83605459..2c7c0731a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -840,13 +840,13 @@ impl Connection { /// Finish WAL session by ending read+write transaction taken in the [Self::wal_insert_begin] method /// All frames written after last commit frame (db_size > 0) within the session will be rolled back #[cfg(feature = "fs")] - pub fn wal_insert_end(&self) -> Result<()> { + pub fn wal_insert_end(self: &Arc) -> Result<()> { let pager = self.pager.borrow(); - let mut wal = pager.wal.borrow_mut(); - // remove all non-commited changes in case if WAL session left some suffix without commit frame - wal.rollback() - .expect("wal must be able to rollback any non-commited changes"); + // remove all non-commited changes in case if WAL session left some suffix without commit frame + pager.rollback(false, self).expect("rollback must succeed"); + + let wal = pager.wal.borrow_mut(); wal.end_write_tx(); wal.end_read_tx(); Ok(()) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 8df34199b..4cf6f14d3 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -3,7 +3,9 @@ use crate::storage::btree::BTreePageInner; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::header_accessor; -use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; +use crate::storage::sqlite3_ondisk::{ + self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType, +}; use crate::storage::wal::{CheckpointResult, Wal}; use crate::types::IOResult; use crate::util::IOExt as _; @@ -1018,7 +1020,29 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> { let mut wal = self.wal.borrow_mut(); - wal.write_frame_raw(self.buffer_pool.clone(), frame_no as u64, frame) + let (header, raw_page) = parse_wal_frame_header(frame); + wal.write_frame_raw( + self.buffer_pool.clone(), + frame_no as u64, + header.page_number as u64, + header.db_size as u64, + raw_page, + )?; + if let Some(page) = self.cache_get(header.page_number as usize) { + let content = page.get_contents(); + content.as_ptr().copy_from_slice(raw_page); + self.add_dirty(header.page_number as usize, &page); + } + if header.db_size > 0 { + for page_id in self.dirty_pages.borrow().iter() { + let page_key = PageCacheKey::new(*page_id); + let mut cache = self.page_cache.write(); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + page.clear_dirty(); + } + self.dirty_pages.borrow_mut().clear(); + } + Ok(()) } #[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)] diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d0d5249ca..d75571393 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1546,21 +1546,23 @@ pub fn begin_read_wal_frame( Ok(c) } -pub fn parse_wal_frame_header(frame: &[u8]) -> WalFrameHeader { +pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { let page_number = u32::from_be_bytes(frame[0..4].try_into().unwrap()); let db_size = u32::from_be_bytes(frame[4..8].try_into().unwrap()); let salt_1 = u32::from_be_bytes(frame[8..12].try_into().unwrap()); let salt_2 = u32::from_be_bytes(frame[12..16].try_into().unwrap()); let checksum_1 = u32::from_be_bytes(frame[16..20].try_into().unwrap()); let checksum_2 = u32::from_be_bytes(frame[20..24].try_into().unwrap()); - WalFrameHeader { + let header = WalFrameHeader { page_number, db_size, salt_1, salt_2, checksum_1, checksum_2, - } + }; + let page = &frame[WAL_FRAME_HEADER_SIZE..]; + (header, page) } pub fn prepare_wal_frame( diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 63297f7c2..0c29ac161 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock; use crate::io::{File, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ - begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, parse_wal_frame_header, - prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, + begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, + WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::IOResult; use crate::{turso_assert, Buffer, LimboError, Result}; @@ -223,7 +223,9 @@ pub trait Wal { &mut self, buffer_pool: Arc, frame_id: u64, - frame: &[u8], + page_id: u64, + db_size: u64, + page: &[u8], ) -> Result<()>; /// Write a frame to the WAL. @@ -296,7 +298,9 @@ impl Wal for DummyWAL { &mut self, _buffer_pool: Arc, _frame_id: u64, - _frame: &[u8], + _page_id: u64, + _db_size: u64, + _page: &[u8], ) -> Result<()> { todo!(); } @@ -659,15 +663,16 @@ impl Wal for WalFile { &mut self, buffer_pool: Arc, frame_id: u64, - frame: &[u8], + page_id: u64, + db_size: u64, + page: &[u8], ) -> Result<()> { tracing::debug!("write_raw_frame({})", frame_id); - let expected_frame_len = WAL_FRAME_HEADER_SIZE + self.page_size() as usize; - if frame.len() != expected_frame_len { + if page.len() != self.page_size() as usize { return Err(LimboError::InvalidArgument(format!( - "unexpected frame size: got={}, expected={}", - frame.len(), - expected_frame_len + "unexpected page size in frame: got={}, expected={}", + page.len(), + self.page_size(), ))); } if frame_id > self.max_frame + 1 { @@ -681,7 +686,7 @@ impl Wal for WalFile { // just validate if page content from the frame matches frame in the WAL let offset = self.frame_offset(frame_id); let conflict = Arc::new(Cell::new(false)); - let (frame_ptr, frame_len) = (frame.as_ptr(), frame.len()); + let (page_ptr, page_len) = (page.as_ptr(), page.len()); let complete = Box::new({ let conflict = conflict.clone(); move |buf: Arc>, bytes_read: i32| { @@ -691,8 +696,8 @@ impl Wal for WalFile { bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); - let frame = unsafe { std::slice::from_raw_parts(frame_ptr, frame_len) }; - if buf.as_slice() != &frame[WAL_FRAME_HEADER_SIZE..] { + let page = unsafe { std::slice::from_raw_parts(page_ptr, page_len) }; + if buf.as_slice() != page { conflict.set(true); } } @@ -719,20 +724,19 @@ impl Wal for WalFile { let header = shared.wal_header.clone(); let header = header.lock(); let checksums = self.last_checksum; - let frame_header = parse_wal_frame_header(frame); let (checksums, frame_bytes) = prepare_wal_frame( &header, checksums, header.page_size, - frame_header.page_number, - frame_header.db_size, - &frame[WAL_FRAME_HEADER_SIZE..], + page_id as u32, + db_size as u32, + page, ); let c = Arc::new(Completion::new_write(|_| {})); let c = shared.file.pwrite(offset, frame_bytes, c)?; self.io.wait_for_completion(c)?; - self.complete_append_frame(frame_header.page_number as u64, frame_id, checksums); - if frame_header.db_size > 0 { + self.complete_append_frame(page_id, frame_id, checksums); + if db_size > 0 { self.finish_append_frames_commit()?; } Ok(()) From fb83862013d2fd7842c39f32af4e1124b8645145 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 23 Jul 2025 21:25:58 +0400 Subject: [PATCH 3/5] fix clippy --- tests/integration/functions/test_wal_api.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index 48bbfdf02..10f5f6226 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -162,7 +162,7 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { let seed = rng.next_u64(); let mut rng = ChaCha8Rng::seed_from_u64(seed); - println!("SEED: {}", seed); + println!("SEED: {seed}"); let (mut size, mut synced_frame) = (0, conn2.wal_frame_count().unwrap()); let mut commit_frames = vec![conn1.wal_frame_count().unwrap()]; @@ -170,8 +170,7 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { if rng.next_u32() % 10 != 0 { let key = rng.next_u32(); let length = rng.next_u32() % (4 * 4096); - let query = format!("INSERT INTO t VALUES ({}, randomblob({}))", key, length); - // println!("{}", query); + let query = format!("INSERT INTO t VALUES ({key}, randomblob({length}))"); conn1.execute(&query).unwrap(); commit_frames.push(conn1.wal_frame_count().unwrap()); } else { @@ -179,7 +178,6 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { let next_frame = synced_frame + (rng.next_u32() as u64 % (last_frame - synced_frame + 1)); let mut frame = [0u8; 24 + 4096]; - // println!("sync WAL frames: [{}..{}]", synced_frame + 1, next_frame); conn2.wal_insert_begin().unwrap(); for frame_no in (synced_frame + 1)..=next_frame { let c = conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); From 3d2a38eb887752f65cec273cf0c2350c4a252633 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 24 Jul 2025 11:45:28 +0400 Subject: [PATCH 4/5] add simple helper --- core/storage/pager.rs | 2 +- core/storage/sqlite3_ondisk.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 4cf6f14d3..4bf393967 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1033,7 +1033,7 @@ impl Pager { content.as_ptr().copy_from_slice(raw_page); self.add_dirty(header.page_number as usize, &page); } - if header.db_size > 0 { + if header.is_commit_frame() { for page_id in self.dirty_pages.borrow().iter() { let page_key = PageCacheKey::new(*page_id); let mut cache = self.page_cache.write(); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d75571393..f4b43df41 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -245,6 +245,12 @@ pub struct WalFrameHeader { pub(crate) checksum_2: u32, } +impl WalFrameHeader { + pub fn is_commit_frame(&self) -> bool { + self.db_size > 0 + } +} + impl Default for DatabaseHeader { fn default() -> Self { Self { From edd6ef2d214343d4b4aa989057a3ebc88505c800 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 24 Jul 2025 11:51:33 +0400 Subject: [PATCH 5/5] fix after rebase --- core/storage/pager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 4bf393967..21f905444 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1031,7 +1031,11 @@ impl Pager { if let Some(page) = self.cache_get(header.page_number as usize) { let content = page.get_contents(); content.as_ptr().copy_from_slice(raw_page); - self.add_dirty(header.page_number as usize, &page); + turso_assert!( + page.get().id == header.page_number as usize, + "page has unexpected id" + ); + self.add_dirty(&page); } if header.is_commit_frame() { for page_id in self.dirty_pages.borrow().iter() {