From ee58b7bd86d7f0d69011accaf444ac683d6cced9 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 12 Aug 2025 20:30:10 +0300 Subject: [PATCH 01/12] Add fn read_header() to DatabaseStorage trait --- bindings/javascript/src/lib.rs | 9 +++++++++ core/storage/database.rs | 11 +++++++++++ 2 files changed, 20 insertions(+) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 0d50c6a8f..26af16cc4 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -473,6 +473,15 @@ impl DatabaseFile { } impl turso_core::DatabaseStorage for DatabaseFile { + fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { + let r = c.as_read(); + let size = r.buf().len(); + assert!( + size == 100, + "the size of the database header must be 100 bytes, got {size}" + ); + self.file.pread(0, c) + } fn read_page( &self, page_idx: usize, diff --git a/core/storage/database.rs b/core/storage/database.rs index 1e82b1028..8e99bca56 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -9,6 +9,7 @@ use tracing::{instrument, Level}; /// the storage medium. A database can either be a file on disk, like in SQLite, /// or something like a remote page server service. pub trait DatabaseStorage: Send + Sync { + fn read_header(&self, c: Completion) -> Result; fn read_page(&self, page_idx: usize, c: Completion) -> Result; fn write_page(&self, page_idx: usize, buffer: Arc, c: Completion) -> Result; @@ -37,6 +38,16 @@ unsafe impl Sync for DatabaseFile {} #[cfg(feature = "fs")] impl DatabaseStorage for DatabaseFile { + #[instrument(skip_all, level = Level::DEBUG)] + fn read_header(&self, c: Completion) -> Result { + let r = c.as_read(); + let size = r.buf().len(); + assert!( + size == 100, + "the size of the database header must be 100 bytes, got {size}" + ); + self.file.pread(0, c) + } #[instrument(skip_all, level = Level::DEBUG)] fn read_page(&self, page_idx: usize, c: Completion) -> Result { let r = c.as_read(); From bb21bd93da0c16295a97dbc52f9a27f2419f4c7e Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 12 Aug 2025 20:33:58 +0300 Subject: [PATCH 02/12] Use type-safe PageSize newtype for pager.page_size --- core/lib.rs | 15 ++++++++------- core/storage/pager.rs | 13 ++++++------- core/storage/sqlite3_ondisk.rs | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 046a042c5..6a9f63da4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -480,20 +480,21 @@ impl Database { )?; let size = match page_size { - Some(size) => size as u32, + Some(size) => PageSize::new(size as u32).unwrap_or_else(|| { + panic!("invalid page size: {size}"); + }), None => { pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here .io .block(|| pager.with_header(|header| header.page_size)) .unwrap_or_default() - .get() } }; pager.page_size.set(Some(size)); let wal_path = format!("{}-wal", self.path); let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; - let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; + let real_shared_wal = WalFileShared::new_shared(size.get(), &self.io, file)?; // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections // can open the existing WAL. *maybe_shared_wal = Some(real_shared_wal.clone()); @@ -1476,18 +1477,18 @@ impl Connection { /// is first created, if it does not already exist when the page_size pragma is issued, /// or at the next VACUUM command that is run on the same database connection while not in WAL mode. pub fn reset_page_size(&self, size: u32) -> Result<()> { - if PageSize::new(size).is_none() { + let Some(size) = PageSize::new(size) else { return Ok(()); - } + }; - self.page_size.set(size); + self.page_size.set(size.get()); if self._db.db_state.get() != DbState::Uninitialized { return Ok(()); } *self._db.maybe_shared_wal.write() = None; self.pager.borrow_mut().clear_page_cache(); - let pager = self._db.init_pager(Some(size as usize))?; + let pager = self._db.init_pager(Some(size.get() as usize))?; self.pager.replace(Rc::new(pager)); self.pager.borrow().set_initial_page_size(size); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ffe236cc7..d42174b65 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -424,7 +424,7 @@ pub struct Pager { /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. - pub(crate) page_size: Cell>, + pub(crate) page_size: Cell>, reserved_space: OnceCell, free_page_state: RefCell, /// Maximum number of pages allowed in the database. Default is 1073741823 (SQLite default). @@ -888,7 +888,6 @@ impl Pager { self.io .block(|| self.with_header(|header| header.page_size)) .unwrap_or_default() - .get() }); let reserved_space = *self.reserved_space.get_or_init(|| { @@ -897,11 +896,11 @@ impl Pager { .unwrap_or_default() }); - (page_size as usize) - (reserved_space as usize) + (page_size.get() as usize) - (reserved_space as usize) } /// Set the initial page size for the database. Should only be called before the database is initialized - pub fn set_initial_page_size(&self, size: u32) { + pub fn set_initial_page_size(&self, size: PageSize) { assert_eq!(self.db_state.get(), DbState::Uninitialized); self.page_size.replace(Some(size)); } @@ -1390,8 +1389,8 @@ impl Pager { .io .block(|| self.with_header(|header| header.database_size))? .get(); - let page_size = self.page_size.get().unwrap_or(PageSize::DEFAULT as u32); - let expected = (db_size * page_size) as u64; + let page_size = self.page_size.get().unwrap_or_default(); + let expected = (db_size * page_size.get()) as u64; if expected < self.db_file.size()? { self.io.wait_for_completion(self.db_file.truncate( expected as usize, @@ -1559,7 +1558,7 @@ impl Pager { default_header.database_size = 1.into(); if let Some(size) = self.page_size.get() { - default_header.page_size = PageSize::new(size).expect("page size"); + default_header.page_size = size; } self.buffer_pool .finalize_with_page_size(default_header.page_size.get() as usize)?; diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 47640186c..63fbf3c6e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -947,7 +947,7 @@ pub fn write_pages_vectored( // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's // to submit as `writev`/write_pages calls. - let page_sz = pager.page_size.get().unwrap_or(PageSize::DEFAULT as u32) as usize; + let page_sz = pager.page_size.get().expect("page size is not set").get() as usize; // Count expected number of runs to create the atomic counter we need to track each batch let mut run_count = 0; From f5e27f23ad4786ac1875dbbd5830e93e8f52fa06 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 12 Aug 2025 20:35:21 +0300 Subject: [PATCH 03/12] Use type-safe PageSize newtype for connection.page_size --- core/lib.rs | 9 ++++----- core/translate/pragma.rs | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 6a9f63da4..c1c2d943a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -391,8 +391,7 @@ impl Database { let page_size = pager .io .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - .get(); + .unwrap_or_default(); let default_cache_size = pager .io @@ -784,7 +783,7 @@ pub struct Connection { cache_size: Cell, /// page size used for an uninitialized database or the next vacuum command. /// it's not always equal to the current page size of the database - page_size: Cell, + page_size: Cell, /// Disable automatic checkpoint behaviour when DB is shutted down or WAL reach certain size /// Client still can manually execute PRAGMA wal_checkpoint(...) commands wal_auto_checkpoint_disabled: Cell, @@ -1439,7 +1438,7 @@ impl Connection { pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) { self.capture_data_changes.replace(opts); } - pub fn get_page_size(&self) -> u32 { + pub fn get_page_size(&self) -> PageSize { self.page_size.get() } @@ -1481,7 +1480,7 @@ impl Connection { return Ok(()); }; - self.page_size.set(size.get()); + self.page_size.set(size); if self._db.db_state.get() != DbState::Uninitialized { return Ok(()); } diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index bea505c6b..46d09180d 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -484,7 +484,7 @@ fn query_pragma( pager .io .block(|| pager.with_header(|header| header.page_size.get())) - .unwrap_or(connection.get_page_size()) as i64, + .unwrap_or(connection.get_page_size().get()) as i64, register, ); program.emit_result_row(register, 1); From f8620a986979d34499dc9ab7b1ce0341690aa9de Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 12 Aug 2025 20:36:28 +0300 Subject: [PATCH 04/12] Use non-hardcoded size for BTreeCursor immutablerecord --- core/storage/btree.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 58332b05a..0d4fe9414 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5261,7 +5261,13 @@ impl BTreeCursor { fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option> { if self.reusable_immutable_record.borrow().is_none() { - let record = ImmutableRecord::new(4096); + let page_size = self + .pager + .page_size + .get() + .expect("page size is not set") + .get(); + let record = ImmutableRecord::new(page_size as usize); self.reusable_immutable_record.replace(Some(record)); } self.reusable_immutable_record.borrow_mut() From c75e4c1092387b9bbd01c713bd2e3f7b4b169cc8 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 12 Aug 2025 21:08:49 +0300 Subject: [PATCH 05/12] Fix non-4096 page sizes by making WAL header lazy --- bindings/javascript/src/lib.rs | 6 --- core/lib.rs | 91 ++++++++++++++++++++++--------- core/storage/btree.rs | 2 +- core/storage/database.rs | 6 --- core/storage/pager.rs | 14 +++-- core/storage/wal.rs | 97 +++++++++++++++++----------------- core/vdbe/execute.rs | 5 +- 7 files changed, 129 insertions(+), 92 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 26af16cc4..5c06766cb 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -474,12 +474,6 @@ impl DatabaseFile { impl turso_core::DatabaseStorage for DatabaseFile { fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { - let r = c.as_read(); - let size = r.buf().len(); - assert!( - size == 100, - "the size of the database header must be 100 bytes, got {size}" - ); self.file.pread(0, c) } fn read_page( diff --git a/core/lib.rs b/core/lib.rs index c1c2d943a..5fbf8b751 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -388,10 +388,7 @@ impl Database { pub fn connect(self: &Arc) -> Result> { let pager = self.init_pager(None)?; - let page_size = pager - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default(); + let page_size = pager.page_size.get().expect("page size not set"); let default_cache_size = pager .io @@ -436,16 +433,67 @@ impl Database { self.open_flags.contains(OpenFlags::ReadOnly) } - fn init_pager(&self, page_size: Option) -> Result { + /// If we do not have a physical WAL file, but we know the database file is initialized on disk, + /// we need to read the page_size from the database header. + fn read_page_size_from_db_header(&self) -> Result { + turso_assert!( + self.db_state.is_initialized(), + "read_page_size_from_db_header called on uninitialized database" + ); + let buf = Arc::new(Buffer::new_temporary(PageSize::MIN as usize)); // this needs to be a block multiple for O_DIRECT + let c = Completion::new_read(buf.clone(), move |_buf, _| {}); + let c = self.db_file.read_header(c)?; + self.io.wait_for_completion(c)?; + let page_size = u16::from_be_bytes(buf.as_slice()[16..18].try_into().unwrap()); + Ok(PageSize::new(page_size as u32).unwrap_or_else(|| { + panic!("invalid page size in DB header: {page_size}"); + })) + } + + /// Read the page size in order of preference: + /// 1. From the WAL header if it exists and is initialized + /// 2. From the database header if the database is initialized + /// + /// Otherwise, fall back to, in order of preference: + /// 1. From the requested page size if it is provided + /// 2. PageSize::default(), i.e. 4096 + fn determine_actual_page_size( + &self, + maybe_shared_wal: Option<&WalFileShared>, + requested_page_size: Option, + ) -> Result { + if let Some(shared_wal) = maybe_shared_wal { + let size_in_wal = shared_wal.page_size(); + if size_in_wal != 0 { + return Ok(PageSize::new(size_in_wal).unwrap_or_else(|| { + panic!("invalid page size in WAL: {size_in_wal}"); + })); + } + } + if self.db_state.is_initialized() { + Ok(self.read_page_size_from_db_header()?) + } else { + let Some(size) = requested_page_size else { + return Ok(PageSize::default()); + }; + Ok(PageSize::new(size as u32).unwrap_or_else(|| { + panic!("invalid requested page size: {size}"); + })) + } + } + + fn init_pager(&self, requested_page_size: Option) -> Result { // Open existing WAL file if present let mut maybe_shared_wal = self.maybe_shared_wal.write(); if let Some(shared_wal) = maybe_shared_wal.clone() { - let size = match page_size { - None => unsafe { (*shared_wal.get()).page_size() as usize }, - Some(size) => size, - }; + let page_size = self.determine_actual_page_size( + Some(unsafe { &*shared_wal.get() }), + requested_page_size, + )?; let buffer_pool = self.buffer_pool.clone(); - buffer_pool.finalize_with_page_size(size)?; + if self.db_state.is_initialized() { + buffer_pool.finalize_with_page_size(page_size.get() as usize)?; + } let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( @@ -462,10 +510,17 @@ impl Database { db_state, self.init_lock.clone(), )?; + pager.page_size.set(Some(page_size)); return Ok(pager); } let buffer_pool = self.buffer_pool.clone(); + let page_size = self.determine_actual_page_size(None, requested_page_size)?; + + if self.db_state.is_initialized() { + buffer_pool.finalize_with_page_size(page_size.get() as usize)?; + } + // No existing WAL; create one. let db_state = self.db_state.clone(); let mut pager = Pager::new( @@ -478,22 +533,10 @@ impl Database { Arc::new(Mutex::new(())), )?; - let size = match page_size { - Some(size) => PageSize::new(size as u32).unwrap_or_else(|| { - panic!("invalid page size: {size}"); - }), - None => { - pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - } - }; - - pager.page_size.set(Some(size)); + pager.page_size.set(Some(page_size)); let wal_path = format!("{}-wal", self.path); let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; - let real_shared_wal = WalFileShared::new_shared(size.get(), &self.io, file)?; + let real_shared_wal = WalFileShared::new_shared(file)?; // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections // can open the existing WAL. *maybe_shared_wal = Some(real_shared_wal.clone()); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0d4fe9414..214dfde9a 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8469,7 +8469,7 @@ mod tests { )); let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap(); - let wal_shared = WalFileShared::new_shared(page_size as u32, &io, wal_file).unwrap(); + let wal_shared = WalFileShared::new_shared(wal_file).unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), wal_shared, diff --git a/core/storage/database.rs b/core/storage/database.rs index 8e99bca56..ddcf39827 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -40,12 +40,6 @@ unsafe impl Sync for DatabaseFile {} impl DatabaseStorage for DatabaseFile { #[instrument(skip_all, level = Level::DEBUG)] fn read_header(&self, c: Completion) -> Result { - let r = c.as_read(); - let size = r.buf().len(); - assert!( - size == 100, - "the size of the database header must be 100 bytes, got {size}" - ); self.file.pread(0, c) } #[instrument(skip_all, level = Level::DEBUG)] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index d42174b65..988e07f6c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1147,7 +1147,11 @@ impl Pager { ); page }; - let c = wal.borrow_mut().append_frame(page.clone(), 0)?; + let c = wal.borrow_mut().append_frame( + page.clone(), + self.page_size.get().expect("page size not set"), + 0, + )?; // TODO: invalidade previous completions if this one fails completions.push(c); } @@ -1207,7 +1211,11 @@ impl Pager { }; // TODO: invalidade previous completions on error here - let c = wal.borrow_mut().append_frame(page.clone(), db_size)?; + let c = wal.borrow_mut().append_frame( + page.clone(), + self.page_size.get().expect("page size not set"), + db_size, + )?; completions.push(c); } self.dirty_pages.borrow_mut().clear(); @@ -2214,8 +2222,6 @@ mod ptrmap_tests { let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), WalFileShared::new_shared( - page_size, - &io, io.open_file("test.db-wal", OpenFlags::Create, false) .unwrap(), ) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b02744101..61a42543d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -16,7 +16,7 @@ use crate::io::{File, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, - write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, + write_pages_vectored, PageSize, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::{IOCompletions, IOResult}; use crate::{turso_assert, Buffer, LimboError, Result}; @@ -257,7 +257,12 @@ pub trait Wal { /// db_size > 0 -> last frame written in transaction /// db_size == 0 -> non-last frame written in transaction /// write_counter is the counter we use to track when the I/O operation starts and completes - fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result; + fn append_frame( + &mut self, + page: PageRef, + page_size: PageSize, + db_size: u32, + ) -> Result; /// Complete append of frames by updating shared wal state. Before this /// all changes were stored locally. @@ -951,9 +956,9 @@ impl Wal for WalFile { db_size: u64, page: &[u8], ) -> Result<()> { - if self.get_max_frame_in_wal() == 0 { - self.ensure_header_if_needed()?; - } + self.ensure_header_if_needed(PageSize::new(page.len() as u32).unwrap_or_else(|| { + panic!("invalid page size: {}", page.len()); + }))?; tracing::debug!("write_raw_frame({})", frame_id); if page.len() != self.page_size() as usize { return Err(LimboError::InvalidArgument(format!( @@ -1031,11 +1036,20 @@ impl Wal for WalFile { /// Write a frame to the WAL. #[instrument(skip_all, level = Level::DEBUG)] - fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result { + fn append_frame( + &mut self, + page: PageRef, + page_size: PageSize, + db_size: u32, + ) -> Result { + self.ensure_header_if_needed(page_size)?; let shared = self.get_shared(); - if shared.max_frame.load(Ordering::Acquire).eq(&0) { - self.ensure_header_if_needed()?; - } + let shared_page_size = shared.wal_header.lock().page_size; + turso_assert!( + shared_page_size == page_size.get(), + "page size mismatch - tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}", + page_size.get() + ); let page_id = page.get().id; let frame_id = self.max_frame + 1; let offset = self.frame_offset(frame_id); @@ -1283,20 +1297,26 @@ impl WalFile { /// the WAL file has been truncated and we are writing the first /// frame since then. We need to ensure that the header is initialized. - fn ensure_header_if_needed(&mut self) -> Result<()> { + fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> { + if self.get_shared().is_initialized()? { + return Ok(()); + } tracing::debug!("ensure_header_if_needed"); self.last_checksum = { let shared = self.get_shared(); - if shared.max_frame.load(Ordering::Acquire) != 0 { - return Ok(()); - } - if shared.file.size()? >= WAL_HEADER_SIZE as u64 { - return Ok(()); - } - let mut hdr = shared.wal_header.lock(); + hdr.magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; if hdr.page_size == 0 { - hdr.page_size = self.page_size(); + hdr.page_size = page_size.get(); + } + if hdr.salt_1 == 0 { + hdr.salt_1 = self.io.generate_random_number() as u32; + turso_assert!(hdr.salt_2 == 0, "salt_2 must be zero"); + hdr.salt_2 = self.io.generate_random_number() as u32; } // recompute header checksum @@ -1729,47 +1749,26 @@ impl WalFileShared { } } - pub fn new_shared( - page_size: u32, - io: &Arc, - file: Arc, - ) -> Result>> { + pub fn is_initialized(&self) -> Result { + Ok(self.file.size()? >= WAL_HEADER_SIZE as u64) + } + + pub fn new_shared(file: Arc) -> Result>> { let magic = if cfg!(target_endian = "big") { WAL_MAGIC_BE } else { WAL_MAGIC_LE }; - let mut wal_header = WalHeader { + let wal_header = WalHeader { magic, file_format: 3007000, - page_size, + page_size: 0, // Signifies WAL header that is not persistent on disk yet. checkpoint_seq: 0, // TODO implement sequence number - salt_1: io.generate_random_number() as u32, - salt_2: io.generate_random_number() as u32, + salt_1: 0, + salt_2: 0, checksum_1: 0, checksum_2: 0, }; - let native = cfg!(target_endian = "big"); // if target_endian is - // already big then we don't care but if isn't, header hasn't yet been - // encoded to big endian, therefore we want to swap bytes to compute this - // checksum. - let checksums = (0, 0); - let checksums = checksum_wal( - &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes - &wal_header, - checksums, - native, // this is false because we haven't encoded the wal header yet - ); - wal_header.checksum_1 = checksums.0; - wal_header.checksum_2 = checksums.1; - let c = sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; - let header = Arc::new(SpinLock::new(wal_header)); - let checksum = { - let checksum = header.lock(); - (checksum.checksum_1, checksum.checksum_2) - }; - io.wait_for_completion(c)?; - tracing::debug!("new_shared(header={:?})", header); let read_locks = array::from_fn(|_| TursoRwLock::new()); // slot zero is always zero as it signifies that reads can be done from the db file // directly, and slot 1 is the default read mark containing the max frame. in this case @@ -1785,7 +1784,7 @@ impl WalFileShared { max_frame: AtomicU64::new(0), nbackfills: AtomicU64::new(0), frame_cache: Arc::new(SpinLock::new(HashMap::new())), - last_checksum: checksum, + last_checksum: (0, 0), file, pages_in_frames: Arc::new(SpinLock::new(Vec::new())), read_locks, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 623b10d12..3217de181 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -6785,8 +6785,9 @@ pub fn op_open_ephemeral( let page_size = pager .io .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - .get() as usize; + .unwrap_or_default(); + + pager.page_size.set(Some(page_size)); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; } From a2a88e2c694d4035f9411f90ad9d5c1f018088ae Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Wed, 13 Aug 2025 18:18:31 +0300 Subject: [PATCH 06/12] Make exception for page size literal value 1 --- core/storage/sqlite3_ondisk.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 63fbf3c6e..8b887ccfd 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -85,11 +85,15 @@ pub const LEFT_CHILD_PTR_SIZE_BYTES: usize = 4; pub struct PageSize(U16BE); impl PageSize { + pub const ALIAS_64K: u32 = 1; pub const MIN: u32 = 512; pub const MAX: u32 = 65536; pub const DEFAULT: u16 = 4096; pub const fn new(size: u32) -> Option { + if size == Self::ALIAS_64K { + return Some(Self(U16BE::new(1))); + } if size < PageSize::MIN || size > PageSize::MAX { return None; } From 8a1c3390e6a0721e5ae2c8d47e2e639e9eaf2a46 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 12 Aug 2025 21:29:31 +0300 Subject: [PATCH 07/12] Add integration test for page_size=512 --- tests/integration/pragma.rs | 104 ++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/tests/integration/pragma.rs b/tests/integration/pragma.rs index 14559d11e..533e06c22 100644 --- a/tests/integration/pragma.rs +++ b/tests/integration/pragma.rs @@ -57,3 +57,107 @@ fn test_pragma_module_list_generate_series() { assert!(found, "generate_series should appear in module_list"); } + +#[test] +fn test_pragma_page_sizes_without_writes_persists() { + for test_page_size in [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] { + let db = TempDatabase::new_empty(false); + { + let conn = db.connect_limbo(); + let pragma_query = format!("PRAGMA page_size={test_page_size}"); + conn.execute(&pragma_query).unwrap(); + conn.execute("PRAGMA user_version=1").unwrap(); // even sqlite behavior is that just changing page_size pragma doesn't persist it, so we do this to make a minimal persistent change + } + + let conn = db.connect_limbo(); + let mut rows = conn.query("PRAGMA page_size").unwrap().unwrap(); + let StepResult::Row = rows.step().unwrap() else { + panic!("expected row"); + }; + let row = rows.row().unwrap(); + let Value::Integer(page_size) = row.get_value(0) else { + panic!("expected integer value"); + }; + assert_eq!(*page_size, test_page_size); + + // Reopen database and verify page size + let db = TempDatabase::new_with_existent(&db.path, false); + let conn = db.connect_limbo(); + let mut rows = conn.query("PRAGMA page_size").unwrap().unwrap(); + let StepResult::Row = rows.step().unwrap() else { + panic!("expected row"); + }; + let row = rows.row().unwrap(); + let Value::Integer(page_size) = row.get_value(0) else { + panic!("expected integer value"); + }; + assert_eq!(*page_size, test_page_size); + } +} + +#[test] +fn test_pragma_page_sizes_with_writes_persists() { + for test_page_size in [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] { + let db = TempDatabase::new_empty(false); + { + { + let conn = db.connect_limbo(); + let pragma_query = format!("PRAGMA page_size={test_page_size}"); + conn.execute(&pragma_query).unwrap(); + + // Create table and insert data + conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + conn.execute("INSERT INTO test (id, value) VALUES (1, 'test data')") + .unwrap(); + let mut page_size = conn.pragma_query("page_size").unwrap(); + let mut page_size = page_size.pop().unwrap(); + let page_size = page_size.pop().unwrap(); + let Value::Integer(page_size) = page_size else { + panic!("expected integer value"); + }; + assert_eq!(page_size, test_page_size); + } // Connection is dropped here + + // Reopen database and verify page size and data + let conn = db.connect_limbo(); + + // Check page size is still test_page_size + let mut page_size = conn.pragma_query("page_size").unwrap(); + let mut page_size = page_size.pop().unwrap(); + let page_size = page_size.pop().unwrap(); + let Value::Integer(page_size) = page_size else { + panic!("expected integer value"); + }; + assert_eq!(page_size, test_page_size); + + // Verify data can still be read + let mut rows = conn + .query("SELECT value FROM test WHERE id = 1") + .unwrap() + .unwrap(); + loop { + if let StepResult::Row = rows.step().unwrap() { + let row = rows.row().unwrap(); + let Value::Text(value) = row.get_value(0) else { + panic!("expected text value"); + }; + assert_eq!(value.as_str(), "test data"); + break; + } + rows.run_once().unwrap(); + } + } + + // Drop the db and reopen it, and verify the same + let db = TempDatabase::new_with_existent(&db.path, false); + let conn = db.connect_limbo(); + let mut page_size = conn.pragma_query("page_size").unwrap(); + let mut page_size = page_size.pop().unwrap(); + let page_size = page_size.pop().unwrap(); + let Value::Integer(page_size) = page_size else { + panic!("expected integer value"); + }; + assert_eq!(page_size, test_page_size); + } +} From 38bb0719cc2d943115125e9c0be3ca4e97ec1d45 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 14 Aug 2025 09:51:50 +0300 Subject: [PATCH 08/12] read from disk tweak --- core/lib.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 5fbf8b751..230150667 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -440,14 +440,19 @@ impl Database { self.db_state.is_initialized(), "read_page_size_from_db_header called on uninitialized database" ); - let buf = Arc::new(Buffer::new_temporary(PageSize::MIN as usize)); // this needs to be a block multiple for O_DIRECT + turso_assert!( + PageSize::MIN % 512 == 0, + "header read must be a multiple of 512 for O_DIRECT" + ); + let buf = Arc::new(Buffer::new_temporary(PageSize::MIN as usize)); let c = Completion::new_read(buf.clone(), move |_buf, _| {}); let c = self.db_file.read_header(c)?; self.io.wait_for_completion(c)?; let page_size = u16::from_be_bytes(buf.as_slice()[16..18].try_into().unwrap()); - Ok(PageSize::new(page_size as u32).unwrap_or_else(|| { - panic!("invalid page size in DB header: {page_size}"); - })) + let Some(page_size) = PageSize::new(page_size as u32) else { + bail_corrupt_error!("invalid page size in DB header: {page_size}"); + }; + Ok(page_size) } /// Read the page size in order of preference: From f94fa2bbbe1e584a32b8f4f9180313ed094ad3af Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 14 Aug 2025 09:52:37 +0300 Subject: [PATCH 09/12] salt tweak --- core/storage/wal.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 61a42543d..63c0fbde5 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1313,9 +1313,8 @@ impl WalFile { if hdr.page_size == 0 { hdr.page_size = page_size.get(); } - if hdr.salt_1 == 0 { + if hdr.salt_1 == 0 && hdr.salt_2 == 0 { hdr.salt_1 = self.io.generate_random_number() as u32; - turso_assert!(hdr.salt_2 == 0, "salt_2 must be zero"); hdr.salt_2 = self.io.generate_random_number() as u32; } From 0c6d548402296074143206d4e108b1bbc98ea8e0 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 14 Aug 2025 09:54:53 +0300 Subject: [PATCH 10/12] integration test tweak --- tests/integration/pragma.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/integration/pragma.rs b/tests/integration/pragma.rs index 533e06c22..cb22bb2cd 100644 --- a/tests/integration/pragma.rs +++ b/tests/integration/pragma.rs @@ -110,6 +110,9 @@ fn test_pragma_page_sizes_with_writes_persists() { .unwrap(); conn.execute("INSERT INTO test (id, value) VALUES (1, 'test data')") .unwrap(); + // Insert a big blob just as a small smoke test that our btree handles this well with different page sizes. + conn.execute("INSERT INTO test (id, value) VALUES (2, randomblob(1024*1024))") + .unwrap(); let mut page_size = conn.pragma_query("page_size").unwrap(); let mut page_size = page_size.pop().unwrap(); let page_size = page_size.pop().unwrap(); From c2e89f94f8f209e962178b1fa1dc371823cb030d Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 14 Aug 2025 09:59:25 +0300 Subject: [PATCH 11/12] Change more page size panics to corrupt errors --- core/lib.rs | 14 ++++++++------ core/storage/wal.rs | 9 +++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 230150667..57d25cf28 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -470,9 +470,10 @@ impl Database { if let Some(shared_wal) = maybe_shared_wal { let size_in_wal = shared_wal.page_size(); if size_in_wal != 0 { - return Ok(PageSize::new(size_in_wal).unwrap_or_else(|| { - panic!("invalid page size in WAL: {size_in_wal}"); - })); + let Some(page_size) = PageSize::new(size_in_wal) else { + bail_corrupt_error!("invalid page size in WAL: {size_in_wal}"); + }; + return Ok(page_size); } } if self.db_state.is_initialized() { @@ -481,9 +482,10 @@ impl Database { let Some(size) = requested_page_size else { return Ok(PageSize::default()); }; - Ok(PageSize::new(size as u32).unwrap_or_else(|| { - panic!("invalid requested page size: {size}"); - })) + let Some(page_size) = PageSize::new(size as u32) else { + bail_corrupt_error!("invalid requested page size: {size}"); + }; + Ok(page_size) } } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 63c0fbde5..a8f72b963 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -19,7 +19,7 @@ use crate::storage::sqlite3_ondisk::{ write_pages_vectored, PageSize, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::{IOCompletions, IOResult}; -use crate::{turso_assert, Buffer, LimboError, Result}; +use crate::{bail_corrupt_error, turso_assert, Buffer, LimboError, Result}; use crate::{Completion, Page}; use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -956,9 +956,10 @@ impl Wal for WalFile { db_size: u64, page: &[u8], ) -> Result<()> { - self.ensure_header_if_needed(PageSize::new(page.len() as u32).unwrap_or_else(|| { - panic!("invalid page size: {}", page.len()); - }))?; + let Some(page_size) = PageSize::new(page.len() as u32) else { + bail_corrupt_error!("invalid page size: {}", page.len()); + }; + self.ensure_header_if_needed(page_size)?; tracing::debug!("write_raw_frame({})", frame_id); if page.len() != self.page_size() as usize { return Err(LimboError::InvalidArgument(format!( From bd8c6f3c7c2744a8a88faeca3200004ab2420b31 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 14 Aug 2025 10:19:19 +0300 Subject: [PATCH 12/12] make PageSize more robust: only accept literal '1' value if it comes directly from db header --- core/lib.rs | 4 +--- core/storage/sqlite3_ondisk.rs | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 57d25cf28..c0dc2e90d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -449,9 +449,7 @@ impl Database { let c = self.db_file.read_header(c)?; self.io.wait_for_completion(c)?; let page_size = u16::from_be_bytes(buf.as_slice()[16..18].try_into().unwrap()); - let Some(page_size) = PageSize::new(page_size as u32) else { - bail_corrupt_error!("invalid page size in DB header: {page_size}"); - }; + let page_size = PageSize::new_from_header_u16(page_size)?; Ok(page_size) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 8b887ccfd..1c229617e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -62,7 +62,7 @@ use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; use crate::storage::wal::{PendingFlush, READMARK_NOT_USED}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; -use crate::{turso_assert, File, Result, WalFileShared}; +use crate::{bail_corrupt_error, turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; @@ -85,15 +85,12 @@ pub const LEFT_CHILD_PTR_SIZE_BYTES: usize = 4; pub struct PageSize(U16BE); impl PageSize { - pub const ALIAS_64K: u32 = 1; pub const MIN: u32 = 512; pub const MAX: u32 = 65536; pub const DEFAULT: u16 = 4096; + /// Interpret a user-provided u32 as either a valid page size or None. pub const fn new(size: u32) -> Option { - if size == Self::ALIAS_64K { - return Some(Self(U16BE::new(1))); - } if size < PageSize::MIN || size > PageSize::MAX { return None; } @@ -104,12 +101,28 @@ impl PageSize { } if size == PageSize::MAX { + // Internally, the value 1 represents 65536, since the on-disk value of the page size in the DB header is 2 bytes. return Some(Self(U16BE::new(1))); } Some(Self(U16BE::new(size as u16))) } + /// Interpret a u16 on disk (DB file header) as either a valid page size or + /// return a corrupt error. + pub fn new_from_header_u16(value: u16) -> Result { + match value { + 1 => Ok(Self(U16BE::new(1))), + n => { + let Some(size) = Self::new(n as u32) else { + bail_corrupt_error!("invalid page size in database header: {n}"); + }; + + Ok(size) + } + } + } + pub const fn get(self) -> u32 { match self.0.get() { 1 => Self::MAX,