From cf126824de115801c6583b906aa7e89e1fda489d Mon Sep 17 00:00:00 2001 From: meteorgan Date: Thu, 10 Jul 2025 17:54:44 +0800 Subject: [PATCH 1/8] Support set page size --- core/lib.rs | 213 ++++++++++++++++++-------------- core/storage/btree.rs | 12 +- core/storage/header_accessor.rs | 6 +- core/storage/pager.rs | 22 ++-- core/storage/sqlite3_ondisk.rs | 8 +- core/storage/wal.rs | 4 + core/translate/pragma.rs | 18 ++- core/vdbe/execute.rs | 7 +- 8 files changed, 167 insertions(+), 123 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 130503ede..645a71e53 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -77,7 +77,7 @@ use std::{ use storage::database::DatabaseFile; use storage::page_cache::DumbLruPageCache; pub use storage::pager::PagerCacheflushStatus; -use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNITIALIZED}; +use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED}; pub use storage::{ buffer_pool::BufferPool, database::DatabaseStorage, @@ -117,7 +117,7 @@ pub struct Database { // create DB connections. _shared_page_cache: Arc>, maybe_shared_wal: RwLock>>>, - is_empty: Arc, + db_state: Arc, init_lock: Arc>, open_flags: OpenFlags, } @@ -192,8 +192,9 @@ impl Database { .as_ref() .is_some_and(|wal| unsafe { &*wal.get() }.max_frame.load(Ordering::SeqCst) > 0); - let is_empty = if db_size == 0 && !wal_has_frames { - DB_STATE_UNITIALIZED + // No pages in DB file or WAL -> empty database + let db_state = if db_size == 0 && !wal_has_frames { + DB_STATE_UNINITIALIZED } else { DB_STATE_INITIALIZED }; @@ -209,13 +210,13 @@ impl Database { db_file, io: io.clone(), open_flags: flags, - is_empty: Arc::new(AtomicUsize::new(is_empty)), + db_state: Arc::new(AtomicUsize::new(db_state)), init_lock: Arc::new(Mutex::new(())), }; let db = Arc::new(db); // Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123 - if is_empty == 2 { + if db_state == DB_STATE_INITIALIZED { // parse schema let conn = db.connect()?; let schema_version = get_schema_version(&conn)?; @@ -239,90 +240,16 @@ impl Database { } pub fn connect(self: &Arc) -> Result> { - let buffer_pool = Arc::new(BufferPool::new(None)); + let pager = self.init_pager(None)?; - // Open existing WAL file if present - if let Some(shared_wal) = self.maybe_shared_wal.read().clone() { - // No pages in DB file or WAL -> empty database - let is_empty = self.is_empty.clone(); - let wal = Rc::new(RefCell::new(WalFile::new( - self.io.clone(), - shared_wal, - buffer_pool.clone(), - ))); - let pager = Rc::new(Pager::new( - self.db_file.clone(), - wal, - self.io.clone(), - Arc::new(RwLock::new(DumbLruPageCache::default())), - buffer_pool, - is_empty, - self.init_lock.clone(), - )?); - - let page_size = header_accessor::get_page_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) - as u32; - let default_cache_size = header_accessor::get_default_page_cache_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE); - pager.buffer_pool.set_page_size(page_size as usize); - let conn = Arc::new(Connection { - _db: self.clone(), - pager: pager.clone(), - schema: RefCell::new(self.schema.read().clone()), - last_insert_rowid: Cell::new(0), - auto_commit: Cell::new(true), - mv_transactions: RefCell::new(Vec::new()), - transaction_state: Cell::new(TransactionState::None), - last_change: Cell::new(0), - syms: RefCell::new(SymbolTable::new()), - total_changes: Cell::new(0), - _shared_cache: false, - cache_size: Cell::new(default_cache_size), - readonly: Cell::new(false), - wal_checkpoint_disabled: Cell::new(false), - capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), - closed: Cell::new(false), - }); - if let Err(e) = conn.register_builtins() { - return Err(LimboError::ExtensionError(e)); - } - return Ok(conn); - }; - - // No existing WAL; create one. - // TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround. - let dummy_wal = Rc::new(RefCell::new(DummyWAL {})); - let is_empty = self.is_empty.clone(); - let mut pager = Pager::new( - self.db_file.clone(), - dummy_wal, - self.io.clone(), - Arc::new(RwLock::new(DumbLruPageCache::default())), - buffer_pool.clone(), - is_empty, - Arc::new(Mutex::new(())), - )?; let page_size = header_accessor::get_page_size(&pager) .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as u32; let default_cache_size = header_accessor::get_default_page_cache_size(&pager) .unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE); - let wal_path = format!("{}-wal", self.path); - let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; - let real_shared_wal = WalFileShared::new_shared(page_size, &self.io, file)?; - // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections - // can open the existing WAL. - *self.maybe_shared_wal.write() = Some(real_shared_wal.clone()); - let wal = Rc::new(RefCell::new(WalFile::new( - self.io.clone(), - real_shared_wal, - buffer_pool, - ))); - pager.set_wal(wal); let conn = Arc::new(Connection { _db: self.clone(), - pager: Rc::new(pager), + pager: RefCell::new(Rc::new(pager)), schema: RefCell::new(self.schema.read().clone()), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), @@ -333,6 +260,7 @@ impl Database { syms: RefCell::new(SymbolTable::new()), _shared_cache: false, cache_size: Cell::new(default_cache_size), + page_size: Cell::new(page_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), @@ -345,6 +273,78 @@ impl Database { Ok(conn) } + fn init_pager(&self, page_size: Option) -> Result { + // Open existing WAL file if present + if let Some(shared_wal) = self.maybe_shared_wal.read().clone() { + let size = match page_size { + None => unsafe { (*shared_wal.get()).page_size() as usize }, + Some(size) => { + unsafe { (*shared_wal.get()).set_page_size(size as u32) }; + size + } + }; + let buffer_pool = Arc::new(BufferPool::new(Some(size))); + + let db_state = self.db_state.clone(); + let wal = Rc::new(RefCell::new(WalFile::new( + self.io.clone(), + shared_wal, + buffer_pool.clone(), + ))); + let pager = Pager::new( + self.db_file.clone(), + wal, + self.io.clone(), + Arc::new(RwLock::new(DumbLruPageCache::default())), + buffer_pool.clone(), + db_state, + self.init_lock.clone(), + )?; + return Ok(pager); + } + + let buffer_pool = Arc::new(BufferPool::new(page_size)); + // No existing WAL; create one. + // TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround. + let dummy_wal = Rc::new(RefCell::new(DummyWAL {})); + let db_state = self.db_state.clone(); + let mut pager = Pager::new( + self.db_file.clone(), + dummy_wal, + self.io.clone(), + Arc::new(RwLock::new(DumbLruPageCache::default())), + buffer_pool.clone(), + db_state, + Arc::new(Mutex::new(())), + )?; + + let size = match page_size { + Some(size) => size as u32, + None => { + let size = header_accessor::get_page_size(&pager) + .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) + as u32; + buffer_pool.set_page_size(size as usize); + size + } + }; + + let wal_path = format!("{}-wal", self.path); + let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; + let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; + // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections + // can open the existing WAL. + *self.maybe_shared_wal.write() = Some(real_shared_wal.clone()); + let wal = Rc::new(RefCell::new(WalFile::new( + self.io.clone(), + real_shared_wal, + buffer_pool, + ))); + pager.set_wal(wal); + + Ok(pager) + } + /// Open a new database file with optionally specifying a VFS without an existing database /// connection and symbol table to register extensions. #[cfg(feature = "fs")] @@ -499,7 +499,7 @@ impl CaptureDataChangesMode { pub struct Connection { _db: Arc, - pager: Rc, + pager: RefCell>, schema: RefCell, /// Whether to automatically commit transaction auto_commit: Cell, @@ -511,6 +511,7 @@ pub struct Connection { syms: RefCell, _shared_cache: bool, cache_size: Cell, + page_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, capture_data_changes: RefCell, @@ -545,7 +546,7 @@ impl Connection { let program = Rc::new(translate::translate( self.schema.borrow().deref(), stmt, - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, QueryMode::Normal, @@ -554,7 +555,7 @@ impl Connection { Ok(Statement::new( program, self._db.mv_store.clone(), - self.pager.clone(), + self.pager.borrow().clone(), )) } Cmd::Explain(_stmt) => todo!(), @@ -596,7 +597,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt.clone(), - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, cmd.into(), @@ -605,7 +606,7 @@ impl Connection { let stmt = Statement::new( program.into(), self._db.mv_store.clone(), - self.pager.clone(), + self.pager.borrow().clone(), ); Ok(Some(stmt)) } @@ -656,7 +657,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt, - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, QueryMode::Explain, @@ -669,7 +670,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt, - self.pager.clone(), + self.pager.borrow().clone(), self.clone(), &syms, QueryMode::Normal, @@ -682,7 +683,7 @@ impl Connection { let res = program.step( &mut state, self._db.mv_store.clone(), - self.pager.clone(), + self.pager.borrow().clone(), )?; if matches!(res, StepResult::Done) { break; @@ -703,7 +704,7 @@ impl Connection { if res.is_err() { let state = self.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { - self.pager.rollback(schema_did_change, self)? + self.pager.borrow().rollback(schema_did_change, self)? } } res @@ -750,7 +751,7 @@ impl Connection { } pub fn wal_frame_count(&self) -> Result { - self.pager.wal_frame_count() + self.pager.borrow().wal_frame_count() } pub fn wal_get_frame( @@ -759,7 +760,9 @@ impl Connection { p_frame: *mut u8, frame_len: u32, ) -> Result> { - self.pager.wal_get_frame(frame_no, p_frame, frame_len) + self.pager + .borrow() + .wal_get_frame(frame_no, p_frame, frame_len) } /// Flush dirty pages to disk. @@ -770,11 +773,13 @@ impl Connection { if self.closed.get() { return Err(LimboError::InternalError("Connection closed".to_string())); } - self.pager.cacheflush(self.wal_checkpoint_disabled.get()) + self.pager + .borrow() + .cacheflush(self.wal_checkpoint_disabled.get()) } pub fn clear_page_cache(&self) -> Result<()> { - self.pager.clear_page_cache(); + self.pager.borrow().clear_page_cache(); Ok(()) } @@ -783,6 +788,7 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } self.pager + .borrow() .wal_checkpoint(self.wal_checkpoint_disabled.get()) } @@ -793,6 +799,7 @@ impl Connection { } self.closed.set(true); self.pager + .borrow() .checkpoint_shutdown(self.wal_checkpoint_disabled.get()) } @@ -831,6 +838,22 @@ impl Connection { pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) { self.capture_data_changes.replace(opts); } + pub fn get_page_size(&self) -> u32 { + self.page_size.get() + } + + /// Reset the page size for the current connection. Can only be called when db is uninitialized. + pub fn reset_page_size(&self, size: u32) -> Result<()> { + if self._db.db_state.load(Ordering::SeqCst) != DB_STATE_UNINITIALIZED { + return Ok(()); + } + + self.page_size.set(size); + let pager = self._db.init_pager(Some(size as usize))?; + self.pager.replace(Rc::new(pager)); + + Ok(()) + } #[cfg(feature = "fs")] pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc, Arc)> { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 09abcc2dd..2bddc979d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6593,7 +6593,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); insert_into_cell(page, &payload, pos, 4096).unwrap(); payload @@ -6831,7 +6831,7 @@ mod tests { let io: Arc = Arc::new(MemoryIO::new()); let db = Database::open_file(io.clone(), "test.db", false, false).unwrap(); let conn = db.connect().unwrap(); - let pager = conn.pager.clone(); + let pager = conn.pager.borrow().clone(); // FIXME: handle page cache is full let _ = run_until_done(|| pager.allocate_page1(), &pager); @@ -7717,7 +7717,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); if (free as usize) < payload.len() + 2 { // do not try to insert overflow pages because they require balancing @@ -7790,7 +7790,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); if (free as usize) < payload.len() - 2 { // do not try to insert overflow pages because they require balancing @@ -8154,7 +8154,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); let page = page.get(); insert(0, page.get_contents()); @@ -8231,7 +8231,7 @@ mod tests { &mut payload, &record, 4096, - conn.pager.clone(), + conn.pager.borrow().clone(), ); insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap(); let free = compute_free_space(page.get().get_contents(), usable_space); diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index d1d7c726a..1e1c1c48c 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -35,7 +35,7 @@ const HEADER_OFFSET_VERSION_NUMBER: usize = 96; // Helper to get a read-only reference to the header page. fn get_header_page(pager: &Pager) -> Result> { - if pager.is_empty.load(Ordering::SeqCst) < 2 { + if pager.db_state.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError( "Database is empty, header does not exist - page 1 should've been allocated before this".to_string(), )); @@ -49,7 +49,7 @@ fn get_header_page(pager: &Pager) -> Result> { // Helper to get a writable reference to the header page and mark it dirty. fn get_header_page_for_write(pager: &Pager) -> Result> { - if pager.is_empty.load(Ordering::SeqCst) < 2 { + if pager.db_state.load(Ordering::SeqCst) < 2 { // This should not be called on an empty DB for writing, as page 1 is allocated on first transaction. return Err(LimboError::InternalError( "Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(), @@ -103,7 +103,7 @@ macro_rules! impl_header_field_accessor { // Async version #[allow(dead_code)] pub fn [](pager: &Pager) -> Result> { - if pager.is_empty.load(Ordering::SeqCst) < 2 { + if pager.db_state.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this"))); } let page = match get_header_page(pager)? { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 67499ee86..c5918ca4c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -6,8 +6,8 @@ use crate::storage::header_accessor; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus}; use crate::types::CursorResult; +use crate::Completion; use crate::{Buffer, Connection, LimboError, Result}; -use crate::{Completion, WalFile}; use parking_lot::RwLock; use std::cell::{OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; @@ -191,7 +191,7 @@ pub enum AutoVacuumMode { Incremental, } -pub const DB_STATE_UNITIALIZED: usize = 0; +pub const DB_STATE_UNINITIALIZED: usize = 0; pub const DB_STATE_INITIALIZING: usize = 1; pub const DB_STATE_INITIALIZED: usize = 2; /// The pager interface implements the persistence layer by providing access @@ -218,7 +218,7 @@ pub struct Pager { /// 0 -> Database is empty, /// 1 -> Database is being initialized, /// 2 -> Database is initialized and ready for use. - pub is_empty: Arc, + pub db_state: Arc, /// Mutex for synchronizing database initialization to prevent race conditions init_lock: Arc>, allocate_page1_state: RefCell, @@ -265,10 +265,10 @@ impl Pager { io: Arc, page_cache: Arc>, buffer_pool: Arc, - is_empty: Arc, + db_state: Arc, init_lock: Arc>, ) -> Result { - let allocate_page1_state = if is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { + let allocate_page1_state = if db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { RefCell::new(AllocatePage1State::Start) } else { RefCell::new(AllocatePage1State::Done) @@ -288,7 +288,7 @@ impl Pager { checkpoint_inflight: Rc::new(RefCell::new(0)), buffer_pool, auto_vacuum_mode: RefCell::new(AutoVacuumMode::None), - is_empty, + db_state, init_lock, allocate_page1_state, page_size: OnceCell::new(), @@ -296,7 +296,7 @@ impl Pager { }) } - pub fn set_wal(&mut self, wal: Rc>) { + pub fn set_wal(&mut self, wal: Rc>) { self.wal = wal; } @@ -608,10 +608,10 @@ impl Pager { #[instrument(skip_all, level = Level::INFO)] fn maybe_allocate_page1(&self) -> Result> { - if self.is_empty.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { + if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { if let Ok(_lock) = self.init_lock.try_lock() { match ( - self.is_empty.load(Ordering::SeqCst), + self.db_state.load(Ordering::SeqCst), self.allocating_page1(), ) { // In case of being empty or (allocating and this connection is performing allocation) then allocate the first page @@ -1054,7 +1054,7 @@ impl Pager { match state { AllocatePage1State::Start => { tracing::trace!("allocate_page1(Start)"); - self.is_empty.store(DB_STATE_INITIALIZING, Ordering::SeqCst); + self.db_state.store(DB_STATE_INITIALIZING, Ordering::SeqCst); let mut default_header = DatabaseHeader::default(); default_header.database_size += 1; let page = allocate_page(1, &self.buffer_pool, 0); @@ -1100,7 +1100,7 @@ impl Pager { cache.insert(page_key, page1_ref.clone()).map_err(|e| { LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}")) })?; - self.is_empty.store(DB_STATE_INITIALIZED, Ordering::SeqCst); + self.db_state.store(DB_STATE_INITIALIZED, Ordering::SeqCst); self.allocate_page1_state.replace(AllocatePage1State::Done); Ok(CursorResult::Ok(page1_ref.clone())) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index af5438859..67a4be5ec 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -83,7 +83,7 @@ pub const MIN_PAGE_CACHE_SIZE: usize = 10; pub const MIN_PAGE_SIZE: u32 = 512; /// The maximum page size in bytes. -const MAX_PAGE_SIZE: u32 = 65536; +pub const MAX_PAGE_SIZE: u32 = 65536; /// The default page size in bytes. pub const DEFAULT_PAGE_SIZE: u16 = 4096; @@ -279,7 +279,7 @@ impl Default for DatabaseHeader { impl DatabaseHeader { pub fn update_page_size(&mut self, size: u32) { - if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) || (size & (size - 1) != 0) { + if !is_valid_page_size(size) { return; } @@ -299,6 +299,10 @@ impl DatabaseHeader { } } +pub fn is_valid_page_size(size: u32) -> bool { + (MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) && (size & (size - 1)) == 0 +} + pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) { buf[0..16].copy_from_slice(&header.magic); buf[16..18].copy_from_slice(&header.page_size.to_be_bytes()); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 45114fed4..884df632b 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1123,4 +1123,8 @@ impl WalFileShared { pub fn page_size(&self) -> u32 { self.wal_header.lock().page_size } + + pub fn set_page_size(&self, page_size: u32) { + self.wal_header.lock().page_size = page_size; + } } diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 45f554fc9..ce6490e99 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -9,7 +9,7 @@ use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; use crate::pragma::pragma_for; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; -use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; +use crate::storage::sqlite3_ondisk::{is_valid_page_size, MIN_PAGE_CACHE_SIZE}; use crate::storage::wal::CheckpointMode; use crate::translate::schema::translate_create_table; use crate::util::{normalize_ident, parse_signed_number, parse_string}; @@ -149,7 +149,13 @@ fn update_pragma( unreachable!(); } PragmaName::PageSize => { - bail_parse_error!("Updating database page size is not supported."); + let page_size = match parse_signed_number(&value)? { + Value::Integer(size) => size, + Value::Float(size) => size as i64, + _ => bail_parse_error!("Invalid value for page size pragma"), + }; + update_page_size(connection, page_size as u32)?; + Ok(program) } PragmaName::AutoVacuum => { let auto_vacuum_mode = match value { @@ -528,3 +534,11 @@ fn turso_cdc_table_columns() -> Vec { }, ] } +fn update_page_size(connection: Arc, page_size: u32) -> crate::Result<()> { + if !is_valid_page_size(page_size) { + return Ok(()); + } + + connection.reset_page_size(page_size)?; + Ok(()) +} diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index be9f666cb..d77869386 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -26,6 +26,7 @@ use crate::{ }, printf::exec_printf, }, + IO, }; use std::ops::DerefMut; use std::sync::atomic::AtomicUsize; @@ -55,9 +56,7 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract}, }; -use crate::{ - info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState, IO, -}; +use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState}; use super::{ insn::{Cookie, RegisterOrLiteral}, @@ -5936,7 +5935,7 @@ pub fn op_open_ephemeral( OpOpenEphemeralState::Start => { tracing::trace!("Start"); let conn = program.connection.clone(); - let io = conn.pager.io.get_memory_io(); + let io = conn.pager.borrow().io.get_memory_io(); let file = io.open_file("", OpenFlags::Create, true)?; let db_file = Arc::new(FileMemoryStorage::new(file)); From a6faab17e93750c4b77f0242f8670ed6e701bba8 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Thu, 10 Jul 2025 23:39:19 +0800 Subject: [PATCH 2/8] fix query page size --- core/lib.rs | 5 ++--- core/storage/btree.rs | 2 +- core/storage/header_accessor.rs | 20 +++++++++++++++++++- core/storage/pager.rs | 2 +- core/storage/sqlite3_ondisk.rs | 4 ++-- core/translate/pragma.rs | 3 +-- 6 files changed, 26 insertions(+), 10 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 645a71e53..1aa636c23 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -243,7 +243,7 @@ impl Database { let pager = self.init_pager(None)?; let page_size = header_accessor::get_page_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as u32; + .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE); let default_cache_size = header_accessor::get_default_page_cache_size(&pager) .unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE); @@ -322,8 +322,7 @@ impl Database { Some(size) => size as u32, None => { let size = header_accessor::get_page_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) - as u32; + .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE); buffer_pool.set_page_size(size as usize); size } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2bddc979d..a26c84802 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7368,7 +7368,7 @@ mod tests { pager.allocate_page().unwrap(); } - header_accessor::set_page_size(&pager, page_size as u16).unwrap(); + header_accessor::set_page_size(&pager, page_size).unwrap(); pager } diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index 1e1c1c48c..682a0ce61 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -1,3 +1,4 @@ +use crate::storage::sqlite3_ondisk::MAX_PAGE_SIZE; use crate::{ storage::{ self, @@ -158,7 +159,7 @@ macro_rules! impl_header_field_accessor { } // impl_header_field_accessor!(magic, [u8; 16], HEADER_OFFSET_MAGIC); -impl_header_field_accessor!(page_size, u16, HEADER_OFFSET_PAGE_SIZE); +impl_header_field_accessor!(page_size_, u16, HEADER_OFFSET_PAGE_SIZE); impl_header_field_accessor!(write_version, u8, HEADER_OFFSET_WRITE_VERSION); impl_header_field_accessor!(read_version, u8, HEADER_OFFSET_READ_VERSION); impl_header_field_accessor!(reserved_space, u8, HEADER_OFFSET_RESERVED_SPACE); @@ -193,3 +194,20 @@ impl_header_field_accessor!(application_id, u32, HEADER_OFFSET_APPLICATION_ID); //impl_header_field_accessor!(reserved_for_expansion, [u8; 20], HEADER_OFFSET_RESERVED_FOR_EXPANSION); impl_header_field_accessor!(version_valid_for, u32, HEADER_OFFSET_VERSION_VALID_FOR); impl_header_field_accessor!(version_number, u32, HEADER_OFFSET_VERSION_NUMBER); + +pub fn get_page_size(pager: &Pager) -> Result { + let size = get_page_size_(pager)?; + if size == 1 { + return Ok(MAX_PAGE_SIZE); + } + Ok(size as u32) +} + +pub fn set_page_size(pager: &Pager, value: u32) -> Result<()> { + let page_size = if value == MAX_PAGE_SIZE { + 1 + } else { + value as u16 + }; + set_page_size_(pager, page_size) +} diff --git a/core/storage/pager.rs b/core/storage/pager.rs index c5918ca4c..b88a92475 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -225,7 +225,7 @@ pub struct Pager { /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. - page_size: OnceCell, + page_size: OnceCell, reserved_space: OnceCell, } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 67a4be5ec..abbdc637c 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -86,7 +86,7 @@ pub const MIN_PAGE_SIZE: u32 = 512; pub const MAX_PAGE_SIZE: u32 = 65536; /// The default page size in bytes. -pub const DEFAULT_PAGE_SIZE: u16 = 4096; +pub const DEFAULT_PAGE_SIZE: u32 = 4096; pub const DATABASE_HEADER_PAGE_ID: usize = 1; @@ -251,7 +251,7 @@ impl Default for DatabaseHeader { fn default() -> Self { Self { magic: *b"SQLite format 3\0", - page_size: DEFAULT_PAGE_SIZE, + page_size: DEFAULT_PAGE_SIZE as u16, write_version: 2, read_version: 2, reserved_space: 0, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index ce6490e99..815ed0f04 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -362,8 +362,7 @@ fn query_pragma( } PragmaName::PageSize => { program.emit_int( - header_accessor::get_page_size(&pager) - .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as i64, + header_accessor::get_page_size(&pager).unwrap_or(connection.get_page_size()) as i64, register, ); program.emit_result_row(register, 1); From bf69b86e94062d2746ac76cb4305f3cd28e05686 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 11 Jul 2025 17:12:10 +0800 Subject: [PATCH 3/8] fix: not all pragma need transaction --- core/storage/header_accessor.rs | 1 + core/translate/pragma.rs | 45 ++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index 682a0ce61..766a122b0 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -203,6 +203,7 @@ pub fn get_page_size(pager: &Pager) -> Result { Ok(size as u32) } +#[allow(dead_code)] pub fn set_page_size(pager: &Pager, value: u32) -> Result<()> { let page_size = if value == MAX_PAGE_SIZE { 1 diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 815ed0f04..0680f53dc 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -22,6 +22,7 @@ use strum::IntoEnumIterator; use super::integrity_check::translate_integrity_check; use crate::storage::header_accessor; use crate::storage::pager::Pager; +use crate::translate::emitter::TransactionMode; fn list_pragmas(program: &mut ProgramBuilder) { for x in PragmaName::iter() { @@ -47,7 +48,6 @@ pub fn translate_pragma( approx_num_labels: 0, }; program.extend(&opts); - let mut write = false; if name.name.0.eq_ignore_ascii_case("pragma_list") { list_pragmas(&mut program); @@ -59,22 +59,16 @@ pub fn translate_pragma( Err(_) => bail_parse_error!("Not a valid pragma name"), }; - let mut program = match body { + let (mut program, mode) = match body { None => query_pragma(pragma, schema, None, pager, connection, program)?, Some(ast::PragmaBody::Equals(value) | ast::PragmaBody::Call(value)) => match pragma { PragmaName::TableInfo => { query_pragma(pragma, schema, Some(value), pager, connection, program)? } - _ => { - write = true; - update_pragma(pragma, schema, value, pager, connection, program)? - } + _ => update_pragma(pragma, schema, value, pager, connection, program)?, }, }; - program.epilogue(match write { - false => super::emitter::TransactionMode::Read, - true => super::emitter::TransactionMode::Write, - }); + program.epilogue(mode); Ok(program) } @@ -86,7 +80,7 @@ fn update_pragma( pager: Rc, connection: Arc, mut program: ProgramBuilder, -) -> crate::Result { +) -> crate::Result<(ProgramBuilder, TransactionMode)> { match pragma { PragmaName::CacheSize => { let cache_size = match parse_signed_number(&value)? { @@ -95,7 +89,7 @@ fn update_pragma( _ => bail_parse_error!("Invalid value for cache size pragma"), }; update_cache_size(cache_size, pager, connection)?; - Ok(program) + Ok((program, TransactionMode::None)) } PragmaName::JournalMode => query_pragma( PragmaName::JournalMode, @@ -105,7 +99,7 @@ fn update_pragma( connection, program, ), - PragmaName::LegacyFileFormat => Ok(program), + PragmaName::LegacyFileFormat => Ok((program, TransactionMode::None)), PragmaName::WalCheckpoint => query_pragma( PragmaName::WalCheckpoint, schema, @@ -136,7 +130,7 @@ fn update_pragma( value: version_value, p5: 1, }); - Ok(program) + Ok((program, TransactionMode::Write)) } PragmaName::SchemaVersion => { // TODO: Implement updating schema_version @@ -155,7 +149,7 @@ fn update_pragma( _ => bail_parse_error!("Invalid value for page size pragma"), }; update_page_size(connection, page_size as u32)?; - Ok(program) + Ok((program, TransactionMode::None)) } PragmaName::AutoVacuum => { let auto_vacuum_mode = match value { @@ -211,7 +205,7 @@ fn update_pragma( value: auto_vacuum_mode - 1, p5: 0, }); - Ok(program) + Ok((program, TransactionMode::None)) } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), PragmaName::UnstableCaptureDataChangesConn => { @@ -236,7 +230,7 @@ fn update_pragma( )?; } connection.set_capture_data_changes(opts); - Ok(program) + Ok((program, TransactionMode::Write)) } } } @@ -248,20 +242,22 @@ fn query_pragma( pager: Rc, connection: Arc, mut program: ProgramBuilder, -) -> crate::Result { +) -> crate::Result<(ProgramBuilder, TransactionMode)> { let register = program.alloc_register(); match pragma { PragmaName::CacheSize => { program.emit_int(connection.get_cache_size() as i64, register); program.emit_result_row(register, 1); program.add_pragma_result_column(pragma.to_string()); + Ok((program, TransactionMode::None)) } PragmaName::JournalMode => { program.emit_string8("wal".into(), register); program.emit_result_row(register, 1); program.add_pragma_result_column(pragma.to_string()); + Ok((program, TransactionMode::None)) } - PragmaName::LegacyFileFormat => {} + PragmaName::LegacyFileFormat => Ok((program, TransactionMode::None)), PragmaName::WalCheckpoint => { // Checkpoint uses 3 registers: P1, P2, P3. Ref Insn::Checkpoint for more info. // Allocate two more here as one was allocated at the top. @@ -288,6 +284,7 @@ fn query_pragma( dest: register, }); program.emit_result_row(register, 3); + Ok((program, TransactionMode::None)) } PragmaName::PageCount => { program.emit_insn(Insn::PageCount { @@ -296,6 +293,7 @@ fn query_pragma( }); program.emit_result_row(register, 1); program.add_pragma_result_column(pragma.to_string()); + Ok((program, TransactionMode::Read)) } PragmaName::TableInfo => { let table = match value { @@ -341,6 +339,7 @@ fn query_pragma( for name in col_names { program.add_pragma_result_column(name.into()); } + Ok((program, TransactionMode::None)) } PragmaName::UserVersion => { program.emit_insn(Insn::ReadCookie { @@ -350,6 +349,7 @@ fn query_pragma( }); program.add_pragma_result_column(pragma.to_string()); program.emit_result_row(register, 1); + Ok((program, TransactionMode::Read)) } PragmaName::SchemaVersion => { program.emit_insn(Insn::ReadCookie { @@ -359,6 +359,7 @@ fn query_pragma( }); program.add_pragma_result_column(pragma.to_string()); program.emit_result_row(register, 1); + Ok((program, TransactionMode::Read)) } PragmaName::PageSize => { program.emit_int( @@ -367,6 +368,7 @@ fn query_pragma( ); program.emit_result_row(register, 1); program.add_pragma_result_column(pragma.to_string()); + Ok((program, TransactionMode::None)) } PragmaName::AutoVacuum => { let auto_vacuum_mode = pager.get_auto_vacuum_mode(); @@ -383,9 +385,11 @@ fn query_pragma( value: auto_vacuum_mode_i64, }); program.emit_result_row(register, 1); + Ok((program, TransactionMode::None)) } PragmaName::IntegrityCheck => { translate_integrity_check(schema, &mut program)?; + Ok((program, TransactionMode::Read)) } PragmaName::UnstableCaptureDataChangesConn => { let pragma = pragma_for(pragma); @@ -400,10 +404,9 @@ fn query_pragma( program.emit_result_row(register, 2); program.add_pragma_result_column(pragma.columns[0].to_string()); program.add_pragma_result_column(pragma.columns[1].to_string()); + Ok((program, TransactionMode::Read)) } } - - Ok(program) } fn update_auto_vacuum_mode( From e2ab6736249808cbfdac0ea772098d2502f2ebdf Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 11 Jul 2025 17:54:35 +0800 Subject: [PATCH 4/8] fix self.pager.replace() panic --- core/lib.rs | 30 +++++++++++------------------- core/translate/pragma.rs | 2 +- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 1aa636c23..12d758005 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -540,22 +540,19 @@ impl Connection { .unwrap() .trim(); self.maybe_update_schema(); + let pager = self.pager.borrow().clone(); match cmd { Cmd::Stmt(stmt) => { let program = Rc::new(translate::translate( self.schema.borrow().deref(), stmt, - self.pager.borrow().clone(), + pager.clone(), self.clone(), &syms, QueryMode::Normal, input, )?); - Ok(Statement::new( - program, - self._db.mv_store.clone(), - self.pager.borrow().clone(), - )) + Ok(Statement::new(program, self._db.mv_store.clone(), pager)) } Cmd::Explain(_stmt) => todo!(), Cmd::ExplainQueryPlan(_stmt) => todo!(), @@ -591,22 +588,19 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let syms = self.syms.borrow(); + let pager = self.pager.borrow().clone(); match cmd { Cmd::Stmt(ref stmt) | Cmd::Explain(ref stmt) => { let program = translate::translate( self.schema.borrow().deref(), stmt.clone(), - self.pager.borrow().clone(), + pager.clone(), self.clone(), &syms, cmd.into(), input, )?; - let stmt = Statement::new( - program.into(), - self._db.mv_store.clone(), - self.pager.borrow().clone(), - ); + let stmt = Statement::new(program.into(), self._db.mv_store.clone(), pager); Ok(Some(stmt)) } Cmd::ExplainQueryPlan(stmt) => { @@ -646,6 +640,7 @@ impl Connection { let mut parser = Parser::new(sql.as_bytes()); while let Some(cmd) = parser.next()? { let syms = self.syms.borrow(); + let pager = self.pager.borrow().clone(); let byte_offset_end = parser.offset(); let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() @@ -656,7 +651,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt, - self.pager.borrow().clone(), + pager, self.clone(), &syms, QueryMode::Explain, @@ -669,7 +664,7 @@ impl Connection { let program = translate::translate( self.schema.borrow().deref(), stmt, - self.pager.borrow().clone(), + pager.clone(), self.clone(), &syms, QueryMode::Normal, @@ -679,11 +674,8 @@ impl Connection { let mut state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); loop { - let res = program.step( - &mut state, - self._db.mv_store.clone(), - self.pager.borrow().clone(), - )?; + let res = + program.step(&mut state, self._db.mv_store.clone(), pager.clone())?; if matches!(res, StepResult::Done) { break; } diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 0680f53dc..d6736ef95 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -30,7 +30,7 @@ fn list_pragmas(program: &mut ProgramBuilder) { program.emit_result_row(register, 1); } program.add_pragma_result_column("pragma_list".into()); - program.epilogue(crate::translate::emitter::TransactionMode::None); + program.epilogue(TransactionMode::None); } #[allow(clippy::too_many_arguments)] From f123c77ee88237abbfea124cb87b1964f6f3e820 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 11 Jul 2025 22:22:16 +0800 Subject: [PATCH 5/8] fix set page_size in pager --- core/lib.rs | 17 +++++++++++++++-- core/storage/pager.rs | 15 ++++++++++++--- core/translate/pragma.rs | 7 ++----- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 12d758005..540f81761 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -41,6 +41,7 @@ mod numeric; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +use crate::storage::sqlite3_ondisk::is_valid_page_size; use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; @@ -510,6 +511,8 @@ pub struct Connection { syms: RefCell, _shared_cache: bool, cache_size: Cell, + /// page size used for an uninitialized database or the next vacuum command. + /// it's not always equal to the current page size of the database page_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, @@ -833,15 +836,25 @@ impl Connection { self.page_size.get() } - /// Reset the page size for the current connection. Can only be called when db is uninitialized. + /// Reset the page size for the current connection. + /// + /// Specifying a new page size does not change the page size immediately. + /// Instead, the new page size is remembered and is used to set the page size when the database + /// is first created, if it does not already exist when the page_size pragma is issued, + /// or at the next VACUUM command that is run on the same database connection while not in WAL mode. pub fn reset_page_size(&self, size: u32) -> Result<()> { - if self._db.db_state.load(Ordering::SeqCst) != DB_STATE_UNINITIALIZED { + if !is_valid_page_size(size) { return Ok(()); } self.page_size.set(size); + if self._db.db_state.load(Ordering::SeqCst) != DB_STATE_UNINITIALIZED { + return Ok(()); + } + let pager = self._db.init_pager(Some(size as usize))?; self.pager.replace(Rc::new(pager)); + self.pager.borrow().set_initial_page_size(size); Ok(()) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index b88a92475..2b2d2cd32 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -225,7 +225,7 @@ pub struct Pager { /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. - page_size: OnceCell, + page_size: RefCell>, reserved_space: OnceCell, } @@ -291,7 +291,7 @@ impl Pager { db_state, init_lock, allocate_page1_state, - page_size: OnceCell::new(), + page_size: RefCell::new(None), reserved_space: OnceCell::new(), }) } @@ -586,7 +586,8 @@ impl Pager { pub fn usable_space(&self) -> usize { let page_size = *self .page_size - .get_or_init(|| header_accessor::get_page_size(self).unwrap_or_default()); + .borrow_mut() + .get_or_insert_with(|| header_accessor::get_page_size(self).unwrap_or_default()); let reserved_space = *self .reserved_space @@ -595,6 +596,11 @@ impl Pager { (page_size as usize) - (reserved_space as usize) } + /// Set the initial page size for the database. Should only be called before the database is initialized + pub fn set_initial_page_size(&self, size: u32) { + self.page_size.replace(Some(size)); + } + #[inline(always)] #[instrument(skip_all, level = Level::INFO)] pub fn begin_read_tx(&self) -> Result> { @@ -1057,6 +1063,9 @@ impl Pager { self.db_state.store(DB_STATE_INITIALIZING, Ordering::SeqCst); let mut default_header = DatabaseHeader::default(); default_header.database_size += 1; + if let Some(size) = *self.page_size.borrow() { + default_header.update_page_size(size); + } let page = allocate_page(1, &self.buffer_pool, 0); let contents = page.get_contents(); diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index d6736ef95..da99412ad 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -9,7 +9,7 @@ use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; use crate::pragma::pragma_for; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; -use crate::storage::sqlite3_ondisk::{is_valid_page_size, MIN_PAGE_CACHE_SIZE}; +use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; use crate::storage::wal::CheckpointMode; use crate::translate::schema::translate_create_table; use crate::util::{normalize_ident, parse_signed_number, parse_string}; @@ -536,11 +536,8 @@ fn turso_cdc_table_columns() -> Vec { }, ] } -fn update_page_size(connection: Arc, page_size: u32) -> crate::Result<()> { - if !is_valid_page_size(page_size) { - return Ok(()); - } +fn update_page_size(connection: Arc, page_size: u32) -> crate::Result<()> { connection.reset_page_size(page_size)?; Ok(()) } From 39d79d7420c9f2173378eadcff52cf88dc655ac1 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 11 Jul 2025 23:05:58 +0800 Subject: [PATCH 6/8] add tests for page_size pragma --- testing/pragma.test | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/testing/pragma.test b/testing/pragma.test index ff42424bf..98c34265f 100755 --- a/testing/pragma.test +++ b/testing/pragma.test @@ -169,3 +169,35 @@ do_execsql_test pragma-function-sql-injection { SELECT * FROM pragma_table_info('sqlite_schema'';CREATE TABLE foo(c0);SELECT ''bar'); SELECT * FROM pragma_table_info('foo'); } {} + +do_execsql_test_on_specific_db ":memory:" pragma-page-size-default { + PRAGMA page_size +} {4096} + +do_execsql_test_on_specific_db ":memory:" pragma-page-size-set { + PRAGMA page_size=1024; + PRAGMA page_size +} {1024} + +# pragma page_size=xxx doesn't change the page size of an initialized database. +do_execsql_test_on_specific_db ":memory:" pragma-page-size-set-initialized-db { + CREATE TABLE "foo bar"(c0); + + PRAGMA page_size=1024; + PRAGMA page_size +} {4096} + +# pragma page_size=xxx changes the page size of an uninitialized database and persists the change. +set test_pragma_page_size_db "testing/testing_pragma_page_size.db" +# set user_version to trigger database initialization. +do_execsql_test_on_specific_db $test_pragma_page_size_db pragma-page-size-set-initialized-db-1 { + PRAGMA page_size=1024; + PRAGMA user_version=1; + PRAGMA page_size +} {1024} + +do_execsql_test_on_specific_db $test_pragma_page_size_db pragma-page-size-set-uninitialized-db-2 { + PRAGMA page_size +} {1024} +catch {file delete -force $test_pragma_page_size_db} +catch {file delete -force "${test_pragma_page_size_db}-wal"} From b42a1ef2728d786fa7fa5ab2d3d3897e5cec9ea7 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Sat, 12 Jul 2025 17:50:54 +0800 Subject: [PATCH 7/8] minor improvements based on PR comments --- core/lib.rs | 13 ++++--------- core/storage/pager.rs | 11 ++++++----- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 540f81761..0e763a67a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -179,7 +179,6 @@ impl Database { ) -> Result> { let wal_path = format!("{path}-wal"); let maybe_shared_wal = WalFileShared::open_shared_if_exists(&io, wal_path.as_str())?; - let db_size = db_file.size()?; let mv_store = if enable_mvcc { Some(Rc::new(MvStore::new( @@ -189,12 +188,9 @@ impl Database { } else { None }; - let wal_has_frames = maybe_shared_wal - .as_ref() - .is_some_and(|wal| unsafe { &*wal.get() }.max_frame.load(Ordering::SeqCst) > 0); - // No pages in DB file or WAL -> empty database - let db_state = if db_size == 0 && !wal_has_frames { + let db_size = db_file.size()?; + let db_state = if db_size == 0 { DB_STATE_UNINITIALIZED } else { DB_STATE_INITIALIZED @@ -228,10 +224,9 @@ impl Database { .expect("lock on schema should succeed first try"); let syms = conn.syms.borrow(); + let pager = conn.pager.borrow().clone(); - if let Err(LimboError::ExtensionError(e)) = - schema.make_from_btree(None, conn.pager.clone(), &syms) - { + if let Err(LimboError::ExtensionError(e)) = schema.make_from_btree(None, pager, &syms) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module eprintln!("Warning: {e}"); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2b2d2cd32..9d9a927d5 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -9,7 +9,7 @@ use crate::types::CursorResult; use crate::Completion; use crate::{Buffer, Connection, LimboError, Result}; use parking_lot::RwLock; -use std::cell::{OnceCell, RefCell, UnsafeCell}; +use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -225,7 +225,7 @@ pub struct Pager { /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. - page_size: RefCell>, + page_size: Cell>, reserved_space: OnceCell, } @@ -291,7 +291,7 @@ impl Pager { db_state, init_lock, allocate_page1_state, - page_size: RefCell::new(None), + page_size: Cell::new(None), reserved_space: OnceCell::new(), }) } @@ -586,7 +586,7 @@ impl Pager { pub fn usable_space(&self) -> usize { let page_size = *self .page_size - .borrow_mut() + .get() .get_or_insert_with(|| header_accessor::get_page_size(self).unwrap_or_default()); let reserved_space = *self @@ -598,6 +598,7 @@ impl Pager { /// Set the initial page size for the database. Should only be called before the database is initialized pub fn set_initial_page_size(&self, size: u32) { + assert_eq!(self.db_state.load(Ordering::SeqCst), DB_STATE_UNINITIALIZED); self.page_size.replace(Some(size)); } @@ -1063,7 +1064,7 @@ impl Pager { self.db_state.store(DB_STATE_INITIALIZING, Ordering::SeqCst); let mut default_header = DatabaseHeader::default(); default_header.database_size += 1; - if let Some(size) = *self.page_size.borrow() { + if let Some(size) = self.page_size.get() { default_header.update_page_size(size); } let page = allocate_page(1, &self.buffer_pool, 0); From d7bdfeb711f702bc5f5183c848a12c01764db54f Mon Sep 17 00:00:00 2001 From: meteorgan Date: Mon, 14 Jul 2025 21:05:43 +0800 Subject: [PATCH 8/8] reinitialize WalFileShare when reset page size --- core/lib.rs | 6 ++---- core/storage/header_accessor.rs | 19 ++++++++++++++++--- core/storage/wal.rs | 4 ---- testing/pragma.test | 4 +++- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 0e763a67a..3d169121a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -274,10 +274,7 @@ impl Database { if let Some(shared_wal) = self.maybe_shared_wal.read().clone() { let size = match page_size { None => unsafe { (*shared_wal.get()).page_size() as usize }, - Some(size) => { - unsafe { (*shared_wal.get()).set_page_size(size as u32) }; - size - } + Some(size) => size, }; let buffer_pool = Arc::new(BufferPool::new(Some(size))); @@ -847,6 +844,7 @@ impl Connection { return Ok(()); } + *self._db.maybe_shared_wal.write() = None; let pager = self._db.init_pager(Some(size as usize))?; self.pager.replace(Rc::new(pager)); self.pager.borrow().set_initial_page_size(size); diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index 766a122b0..a2aba8013 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -159,7 +159,7 @@ macro_rules! impl_header_field_accessor { } // impl_header_field_accessor!(magic, [u8; 16], HEADER_OFFSET_MAGIC); -impl_header_field_accessor!(page_size_, u16, HEADER_OFFSET_PAGE_SIZE); +impl_header_field_accessor!(page_size_u16, u16, HEADER_OFFSET_PAGE_SIZE); impl_header_field_accessor!(write_version, u8, HEADER_OFFSET_WRITE_VERSION); impl_header_field_accessor!(read_version, u8, HEADER_OFFSET_READ_VERSION); impl_header_field_accessor!(reserved_space, u8, HEADER_OFFSET_RESERVED_SPACE); @@ -196,7 +196,7 @@ impl_header_field_accessor!(version_valid_for, u32, HEADER_OFFSET_VERSION_VALID_ impl_header_field_accessor!(version_number, u32, HEADER_OFFSET_VERSION_NUMBER); pub fn get_page_size(pager: &Pager) -> Result { - let size = get_page_size_(pager)?; + let size = get_page_size_u16(pager)?; if size == 1 { return Ok(MAX_PAGE_SIZE); } @@ -210,5 +210,18 @@ pub fn set_page_size(pager: &Pager, value: u32) -> Result<()> { } else { value as u16 }; - set_page_size_(pager, page_size) + set_page_size_u16(pager, page_size) +} + +#[allow(dead_code)] +pub fn get_page_size_async(pager: &Pager) -> Result> { + match get_page_size_u16_async(pager)? { + CursorResult::Ok(size) => { + if size == 1 { + return Ok(CursorResult::Ok(MAX_PAGE_SIZE)); + } + Ok(CursorResult::Ok(size as u32)) + } + CursorResult::IO => Ok(CursorResult::IO), + } } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 884df632b..45114fed4 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1123,8 +1123,4 @@ impl WalFileShared { pub fn page_size(&self) -> u32 { self.wal_header.lock().page_size } - - pub fn set_page_size(&self, page_size: u32) { - self.wal_header.lock().page_size = page_size; - } } diff --git a/testing/pragma.test b/testing/pragma.test index 98c34265f..879c0516e 100755 --- a/testing/pragma.test +++ b/testing/pragma.test @@ -189,8 +189,10 @@ do_execsql_test_on_specific_db ":memory:" pragma-page-size-set-initialized-db { # pragma page_size=xxx changes the page size of an uninitialized database and persists the change. set test_pragma_page_size_db "testing/testing_pragma_page_size.db" +catch {file delete -force $test_pragma_page_size_db} +catch {file delete -force "${test_pragma_page_size_db}-wal"} # set user_version to trigger database initialization. -do_execsql_test_on_specific_db $test_pragma_page_size_db pragma-page-size-set-initialized-db-1 { +do_execsql_test_on_specific_db $test_pragma_page_size_db pragma-page-size-set-uninitialized-db-1 { PRAGMA page_size=1024; PRAGMA user_version=1; PRAGMA page_size