mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-07 02:04:21 +01:00
Merge 'core/storage: Use AtomicU32 for Pager::page_size' from Pekka Enberg
Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #3191
This commit is contained in:
@@ -478,7 +478,7 @@ impl Database {
|
||||
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
|
||||
let pager = self.init_pager(None)?;
|
||||
|
||||
let page_size = pager.page_size.get().expect("page size not set");
|
||||
let page_size = pager.get_page_size_unchecked();
|
||||
|
||||
let default_cache_size = pager
|
||||
.io
|
||||
@@ -646,7 +646,7 @@ impl Database {
|
||||
db_state,
|
||||
self.init_lock.clone(),
|
||||
)?;
|
||||
pager.page_size.set(Some(page_size));
|
||||
pager.set_page_size(page_size);
|
||||
if let Some(reserved_bytes) = reserved_bytes {
|
||||
pager.set_reserved_space_bytes(reserved_bytes);
|
||||
}
|
||||
@@ -676,7 +676,7 @@ impl Database {
|
||||
Arc::new(Mutex::new(())),
|
||||
)?;
|
||||
|
||||
pager.page_size.set(Some(page_size));
|
||||
pager.set_page_size(page_size);
|
||||
if let Some(reserved_bytes) = reserved_bytes {
|
||||
pager.set_reserved_space_bytes(reserved_bytes);
|
||||
}
|
||||
|
||||
@@ -5459,12 +5459,7 @@ impl BTreeCursor {
|
||||
fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option<ImmutableRecord>> {
|
||||
let mut reusable_immutable_record = self.reusable_immutable_record.borrow_mut();
|
||||
if reusable_immutable_record.is_none() {
|
||||
let page_size = self
|
||||
.pager
|
||||
.page_size
|
||||
.get()
|
||||
.expect("page size is not set")
|
||||
.get();
|
||||
let page_size = self.pager.get_page_size_unchecked().get();
|
||||
let record = ImmutableRecord::new(page_size as usize);
|
||||
reusable_immutable_record.replace(record);
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::cell::{Cell, RefCell, UnsafeCell};
|
||||
use std::collections::HashSet;
|
||||
use std::hash;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::{instrument, trace, Level};
|
||||
|
||||
@@ -517,7 +517,7 @@ pub struct Pager {
|
||||
/// Cache page_size and reserved_space at Pager init and reuse for subsequent
|
||||
/// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality
|
||||
/// to change it.
|
||||
pub(crate) page_size: Cell<Option<PageSize>>,
|
||||
pub(crate) page_size: AtomicU32,
|
||||
reserved_space: Cell<Option<u8>>,
|
||||
free_page_state: RefCell<FreePageState>,
|
||||
/// Maximum number of pages allowed in the database. Default is 1073741823 (SQLite default).
|
||||
@@ -620,7 +620,7 @@ impl Pager {
|
||||
db_state,
|
||||
init_lock,
|
||||
allocate_page1_state,
|
||||
page_size: Cell::new(None),
|
||||
page_size: AtomicU32::new(0), // 0 means not set
|
||||
reserved_space: Cell::new(None),
|
||||
free_page_state: RefCell::new(FreePageState::Start),
|
||||
allocate_page_state: RwLock::new(AllocatePageState::Start),
|
||||
@@ -988,10 +988,13 @@ impl Pager {
|
||||
/// The usable size of a page might be an odd number. However, the usable size is not allowed to be less than 480.
|
||||
/// In other words, if the page size is 512, then the reserved space size cannot exceed 32.
|
||||
pub fn usable_space(&self) -> usize {
|
||||
let page_size = *self.page_size.get().get_or_insert_with(|| {
|
||||
self.io
|
||||
let page_size = self.get_page_size().unwrap_or_else(|| {
|
||||
let size = self
|
||||
.io
|
||||
.block(|| self.with_header(|header| header.page_size))
|
||||
.unwrap_or_default()
|
||||
.unwrap_or_default();
|
||||
self.page_size.store(size.get(), Ordering::SeqCst);
|
||||
size
|
||||
});
|
||||
|
||||
let reserved_space = *self.reserved_space.get().get_or_insert_with(|| {
|
||||
@@ -1006,7 +1009,29 @@ impl Pager {
|
||||
/// Set the initial page size for the database. Should only be called before the database is initialized
|
||||
pub fn set_initial_page_size(&self, size: PageSize) {
|
||||
assert_eq!(self.db_state.get(), DbState::Uninitialized);
|
||||
self.page_size.replace(Some(size));
|
||||
self.page_size.store(size.get(), Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Get the current page size. Returns None if not set yet.
|
||||
pub fn get_page_size(&self) -> Option<PageSize> {
|
||||
let value = self.page_size.load(Ordering::SeqCst);
|
||||
if value == 0 {
|
||||
None
|
||||
} else {
|
||||
PageSize::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current page size, panicking if not set.
|
||||
pub fn get_page_size_unchecked(&self) -> PageSize {
|
||||
let value = self.page_size.load(Ordering::SeqCst);
|
||||
assert_ne!(value, 0, "page size not set");
|
||||
PageSize::new(value).expect("invalid page size stored")
|
||||
}
|
||||
|
||||
/// Set the page size. Used internally when page size is determined.
|
||||
pub fn set_page_size(&self, size: PageSize) {
|
||||
self.page_size.store(size.get(), Ordering::SeqCst);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -1287,7 +1312,7 @@ impl Pager {
|
||||
let len = dirty_pages.len().min(IOV_MAX);
|
||||
let mut completions: Vec<Completion> = Vec::new();
|
||||
let mut pages = Vec::with_capacity(len);
|
||||
let page_sz = self.page_size.get().unwrap_or_default();
|
||||
let page_sz = self.get_page_size().unwrap_or_default();
|
||||
|
||||
let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?;
|
||||
if let Some(c) = prepare {
|
||||
@@ -1384,7 +1409,7 @@ impl Pager {
|
||||
trace!(?state);
|
||||
match state {
|
||||
CommitState::PrepareWal => {
|
||||
let page_sz = self.page_size.get().expect("page size not set");
|
||||
let page_sz = self.get_page_size_unchecked();
|
||||
let c = wal.borrow_mut().prepare_wal_start(page_sz)?;
|
||||
let Some(c) = c else {
|
||||
self.commit_info.state.set(CommitState::GetDbSize);
|
||||
@@ -1419,7 +1444,7 @@ impl Pager {
|
||||
|
||||
let mut completions = self.commit_info.completions.borrow_mut();
|
||||
completions.clear();
|
||||
let page_sz = self.page_size.get().expect("page size not set");
|
||||
let page_sz = self.get_page_size_unchecked();
|
||||
let mut pages: Vec<PageRef> = Vec::with_capacity(dirty_ids.len().min(IOV_MAX));
|
||||
let total = dirty_ids.len();
|
||||
let mut cache = self.page_cache.write();
|
||||
@@ -1747,7 +1772,7 @@ impl Pager {
|
||||
.io
|
||||
.block(|| self.with_header(|header| header.database_size))?
|
||||
.get();
|
||||
let page_size = self.page_size.get().unwrap_or_default();
|
||||
let page_size = self.get_page_size().unwrap_or_default();
|
||||
let expected = (db_size * page_size.get()) as u64;
|
||||
if expected < self.db_file.size()? {
|
||||
if !checkpoint_result.db_truncate_sent {
|
||||
@@ -1942,7 +1967,7 @@ impl Pager {
|
||||
default_header.reserved_space = reserved_space_bytes;
|
||||
self.reserved_space.set(Some(reserved_space_bytes));
|
||||
|
||||
if let Some(size) = self.page_size.get() {
|
||||
if let Some(size) = self.get_page_size() {
|
||||
default_header.page_size = size;
|
||||
}
|
||||
|
||||
@@ -2317,7 +2342,7 @@ impl Pager {
|
||||
cipher_mode: CipherMode,
|
||||
key: &EncryptionKey,
|
||||
) -> Result<()> {
|
||||
let page_size = self.page_size.get().unwrap().get() as usize;
|
||||
let page_size = self.get_page_size_unchecked().get() as usize;
|
||||
let encryption_ctx = EncryptionContext::new(cipher_mode, key, page_size)?;
|
||||
{
|
||||
let mut io_ctx = self.io_ctx.borrow_mut();
|
||||
|
||||
@@ -1000,13 +1000,22 @@ pub fn write_pages_vectored(
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let page_sz = pager.page_size.get().expect("page size is not set").get() as usize;
|
||||
let run_count = batch
|
||||
.keys()
|
||||
.zip(batch.keys().skip(1))
|
||||
.filter(|(&curr, &next)| next != curr + 1)
|
||||
.count()
|
||||
+ 1;
|
||||
let page_sz = pager.get_page_size_unchecked().get() as usize;
|
||||
// Count expected number of runs to create the atomic counter we need to track each batch
|
||||
let mut run_count = 0;
|
||||
let mut prev_id = None;
|
||||
for &id in batch.keys() {
|
||||
if let Some(prev) = prev_id {
|
||||
if id != prev + 1 {
|
||||
run_count += 1;
|
||||
}
|
||||
} else {
|
||||
run_count = 1; // First run
|
||||
}
|
||||
prev_id = Some(id);
|
||||
}
|
||||
|
||||
// Create the atomic counters
|
||||
let runs_left = Arc::new(AtomicUsize::new(run_count));
|
||||
|
||||
const EST_BUFF_CAPACITY: usize = 32;
|
||||
|
||||
@@ -7232,7 +7232,7 @@ pub fn op_open_ephemeral(
|
||||
Arc::new(Mutex::new(())),
|
||||
)?);
|
||||
|
||||
pager.page_size.set(Some(page_size));
|
||||
pager.set_page_size(page_size);
|
||||
|
||||
state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user