diff --git a/core/lib.rs b/core/lib.rs index 70f1774d6..f326452b7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -478,7 +478,7 @@ impl Database { pub fn connect(self: &Arc) -> Result> { let pager = self.init_pager(None)?; - let page_size = pager.page_size.get().expect("page size not set"); + let page_size = pager.get_page_size_unchecked(); let default_cache_size = pager .io @@ -646,7 +646,7 @@ impl Database { db_state, self.init_lock.clone(), )?; - pager.page_size.set(Some(page_size)); + pager.set_page_size(page_size); if let Some(reserved_bytes) = reserved_bytes { pager.set_reserved_space_bytes(reserved_bytes); } @@ -676,7 +676,7 @@ impl Database { Arc::new(Mutex::new(())), )?; - pager.page_size.set(Some(page_size)); + pager.set_page_size(page_size); if let Some(reserved_bytes) = reserved_bytes { pager.set_reserved_space_bytes(reserved_bytes); } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 78b92fa80..d8119ff0d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5459,12 +5459,7 @@ impl BTreeCursor { fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option> { let mut reusable_immutable_record = self.reusable_immutable_record.borrow_mut(); if reusable_immutable_record.is_none() { - let page_size = self - .pager - .page_size - .get() - .expect("page size is not set") - .get(); + let page_size = self.pager.get_page_size_unchecked().get(); let record = ImmutableRecord::new(page_size as usize); reusable_immutable_record.replace(record); } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 9ed7b162d..3422f7b4c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -19,7 +19,7 @@ use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use tracing::{instrument, trace, Level}; @@ -517,7 +517,7 @@ pub struct Pager { /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. - pub(crate) page_size: Cell>, + pub(crate) page_size: AtomicU32, reserved_space: Cell>, free_page_state: RefCell, /// Maximum number of pages allowed in the database. Default is 1073741823 (SQLite default). @@ -620,7 +620,7 @@ impl Pager { db_state, init_lock, allocate_page1_state, - page_size: Cell::new(None), + page_size: AtomicU32::new(0), // 0 means not set reserved_space: Cell::new(None), free_page_state: RefCell::new(FreePageState::Start), allocate_page_state: RwLock::new(AllocatePageState::Start), @@ -988,10 +988,13 @@ impl Pager { /// The usable size of a page might be an odd number. However, the usable size is not allowed to be less than 480. /// In other words, if the page size is 512, then the reserved space size cannot exceed 32. pub fn usable_space(&self) -> usize { - let page_size = *self.page_size.get().get_or_insert_with(|| { - self.io + let page_size = self.get_page_size().unwrap_or_else(|| { + let size = self + .io .block(|| self.with_header(|header| header.page_size)) - .unwrap_or_default() + .unwrap_or_default(); + self.page_size.store(size.get(), Ordering::SeqCst); + size }); let reserved_space = *self.reserved_space.get().get_or_insert_with(|| { @@ -1006,7 +1009,29 @@ impl Pager { /// Set the initial page size for the database. Should only be called before the database is initialized pub fn set_initial_page_size(&self, size: PageSize) { assert_eq!(self.db_state.get(), DbState::Uninitialized); - self.page_size.replace(Some(size)); + self.page_size.store(size.get(), Ordering::SeqCst); + } + + /// Get the current page size. Returns None if not set yet. + pub fn get_page_size(&self) -> Option { + let value = self.page_size.load(Ordering::SeqCst); + if value == 0 { + None + } else { + PageSize::new(value) + } + } + + /// Get the current page size, panicking if not set. + pub fn get_page_size_unchecked(&self) -> PageSize { + let value = self.page_size.load(Ordering::SeqCst); + assert_ne!(value, 0, "page size not set"); + PageSize::new(value).expect("invalid page size stored") + } + + /// Set the page size. Used internally when page size is determined. + pub fn set_page_size(&self, size: PageSize) { + self.page_size.store(size.get(), Ordering::SeqCst); } #[inline(always)] @@ -1287,7 +1312,7 @@ impl Pager { let len = dirty_pages.len().min(IOV_MAX); let mut completions: Vec = Vec::new(); let mut pages = Vec::with_capacity(len); - let page_sz = self.page_size.get().unwrap_or_default(); + let page_sz = self.get_page_size().unwrap_or_default(); let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?; if let Some(c) = prepare { @@ -1384,7 +1409,7 @@ impl Pager { trace!(?state); match state { CommitState::PrepareWal => { - let page_sz = self.page_size.get().expect("page size not set"); + let page_sz = self.get_page_size_unchecked(); let c = wal.borrow_mut().prepare_wal_start(page_sz)?; let Some(c) = c else { self.commit_info.state.set(CommitState::GetDbSize); @@ -1419,7 +1444,7 @@ impl Pager { let mut completions = self.commit_info.completions.borrow_mut(); completions.clear(); - let page_sz = self.page_size.get().expect("page size not set"); + let page_sz = self.get_page_size_unchecked(); let mut pages: Vec = Vec::with_capacity(dirty_ids.len().min(IOV_MAX)); let total = dirty_ids.len(); let mut cache = self.page_cache.write(); @@ -1747,7 +1772,7 @@ impl Pager { .io .block(|| self.with_header(|header| header.database_size))? .get(); - let page_size = self.page_size.get().unwrap_or_default(); + let page_size = self.get_page_size().unwrap_or_default(); let expected = (db_size * page_size.get()) as u64; if expected < self.db_file.size()? { if !checkpoint_result.db_truncate_sent { @@ -1942,7 +1967,7 @@ impl Pager { default_header.reserved_space = reserved_space_bytes; self.reserved_space.set(Some(reserved_space_bytes)); - if let Some(size) = self.page_size.get() { + if let Some(size) = self.get_page_size() { default_header.page_size = size; } @@ -2317,7 +2342,7 @@ impl Pager { cipher_mode: CipherMode, key: &EncryptionKey, ) -> Result<()> { - let page_size = self.page_size.get().unwrap().get() as usize; + let page_size = self.get_page_size_unchecked().get() as usize; let encryption_ctx = EncryptionContext::new(cipher_mode, key, page_size)?; { let mut io_ctx = self.io_ctx.borrow_mut(); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 56316239e..a33b9398c 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1000,13 +1000,22 @@ pub fn write_pages_vectored( return Ok(Vec::new()); } - let page_sz = pager.page_size.get().expect("page size is not set").get() as usize; - let run_count = batch - .keys() - .zip(batch.keys().skip(1)) - .filter(|(&curr, &next)| next != curr + 1) - .count() - + 1; + let page_sz = pager.get_page_size_unchecked().get() as usize; + // Count expected number of runs to create the atomic counter we need to track each batch + let mut run_count = 0; + let mut prev_id = None; + for &id in batch.keys() { + if let Some(prev) = prev_id { + if id != prev + 1 { + run_count += 1; + } + } else { + run_count = 1; // First run + } + prev_id = Some(id); + } + + // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); const EST_BUFF_CAPACITY: usize = 32; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 82025f40e..650e2e1d8 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -7232,7 +7232,7 @@ pub fn op_open_ephemeral( Arc::new(Mutex::new(())), )?); - pager.page_size.set(Some(page_size)); + pager.set_page_size(page_size); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; }