mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
core/storage: Cache schema cookie in Pager
Every transaction was reading page 1 from the WAL to check the schema cookie in op_transaction, causing unnecessary WAL lookups. This commit caches the schema_cookie in Pager as AtomicU64, similar to how page_size and reserved_space are already cached. The cache is updated when the header is read/modified and invalidated in begin_read_tx() when WAL changes are detected from other connections. This matches SQLite's approach of caching frequently accessed header fields to avoid repeated page 1 reads. Improves write throughput by 5% in our benchmarks.
This commit is contained in:
@@ -537,6 +537,11 @@ pub struct Pager {
|
||||
/// to change it.
|
||||
pub(crate) page_size: AtomicU32,
|
||||
reserved_space: AtomicU16,
|
||||
/// Schema cookie cache.
|
||||
///
|
||||
/// Note that schema cookie is 32-bits, but we use 64-bit field so we can
|
||||
/// represent case where value is not set.
|
||||
schema_cookie: AtomicU64,
|
||||
free_page_state: RwLock<FreePageState>,
|
||||
/// Maximum number of pages allowed in the database. Default is 1073741823 (SQLite default).
|
||||
max_page_count: AtomicU32,
|
||||
@@ -650,6 +655,7 @@ impl Pager {
|
||||
allocate_page1_state,
|
||||
page_size: AtomicU32::new(0), // 0 means not set
|
||||
reserved_space: AtomicU16::new(RESERVED_SPACE_NOT_SET),
|
||||
schema_cookie: AtomicU64::new(Self::SCHEMA_COOKIE_NOT_SET),
|
||||
free_page_state: RwLock::new(FreePageState::Start),
|
||||
allocate_page_state: RwLock::new(AllocatePageState::Start),
|
||||
max_page_count: AtomicU32::new(DEFAULT_MAX_PAGE_COUNT),
|
||||
@@ -1115,6 +1121,41 @@ impl Pager {
|
||||
self.reserved_space.store(space as u16, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Schema cookie sentinel value that represents value not set.
|
||||
const SCHEMA_COOKIE_NOT_SET: u64 = u64::MAX;
|
||||
|
||||
/// Get the cached schema cookie. Returns None if not set yet.
|
||||
pub fn get_schema_cookie_cached(&self) -> Option<u32> {
|
||||
let value = self.schema_cookie.load(Ordering::SeqCst);
|
||||
if value == Self::SCHEMA_COOKIE_NOT_SET {
|
||||
None
|
||||
} else {
|
||||
Some(value as u32)
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the schema cookie cache.
|
||||
pub fn set_schema_cookie(&self, cookie: Option<u32>) {
|
||||
match cookie {
|
||||
Some(value) => {
|
||||
self.schema_cookie.store(value as u64, Ordering::SeqCst);
|
||||
}
|
||||
None => self
|
||||
.schema_cookie
|
||||
.store(Self::SCHEMA_COOKIE_NOT_SET, Ordering::SeqCst),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the schema cookie, using the cached value if available to avoid reading page 1.
|
||||
pub fn get_schema_cookie(&self) -> Result<IOResult<u32>> {
|
||||
// Try to use cached value first
|
||||
if let Some(cookie) = self.get_schema_cookie_cached() {
|
||||
return Ok(IOResult::Done(cookie));
|
||||
}
|
||||
// If not cached, read from header and cache it
|
||||
self.with_header(|header| header.schema_cookie.get())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn begin_read_tx(&self) -> Result<()> {
|
||||
@@ -1125,6 +1166,8 @@ impl Pager {
|
||||
if changed {
|
||||
// Someone else changed the database -> assume our page cache is invalid (this is default SQLite behavior, we can probably do better with more granular invalidation)
|
||||
self.clear_page_cache(false);
|
||||
// Invalidate cached schema cookie to force re-read on next access
|
||||
self.set_schema_cookie(None);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -2425,6 +2468,8 @@ impl Pager {
|
||||
);
|
||||
}
|
||||
self.reset_internal_states();
|
||||
// Invalidate cached schema cookie since rollback may have restored the database schema cookie
|
||||
self.set_schema_cookie(None);
|
||||
if schema_did_change {
|
||||
*connection.schema.write() = connection.db.clone_schema();
|
||||
}
|
||||
@@ -2456,13 +2501,18 @@ impl Pager {
|
||||
pub fn with_header<T>(&self, f: impl Fn(&DatabaseHeader) -> T) -> Result<IOResult<T>> {
|
||||
let header_ref = return_if_io!(HeaderRef::from_pager(self));
|
||||
let header = header_ref.borrow();
|
||||
// Update cached schema cookie when reading header
|
||||
self.set_schema_cookie(Some(header.schema_cookie.get()));
|
||||
Ok(IOResult::Done(f(header)))
|
||||
}
|
||||
|
||||
pub fn with_header_mut<T>(&self, f: impl Fn(&mut DatabaseHeader) -> T) -> Result<IOResult<T>> {
|
||||
let header_ref = return_if_io!(HeaderRefMut::from_pager(self));
|
||||
let header = header_ref.borrow_mut();
|
||||
Ok(IOResult::Done(f(header)))
|
||||
let result = f(header);
|
||||
// Update cached schema cookie after modification
|
||||
self.set_schema_cookie(Some(header.schema_cookie.get()));
|
||||
Ok(IOResult::Done(result))
|
||||
}
|
||||
|
||||
pub fn is_encryption_ctx_set(&self) -> bool {
|
||||
|
||||
@@ -2429,9 +2429,7 @@ pub fn op_transaction_inner(
|
||||
// Can only read header if page 1 has been allocated already
|
||||
// begin_write_tx that happens, but not begin_read_tx
|
||||
OpTransactionState::CheckSchemaCookie => {
|
||||
let res = with_header(&pager, mv_store, program, |header| {
|
||||
header.schema_cookie.get()
|
||||
});
|
||||
let res = get_schema_cookie(&pager, mv_store, program);
|
||||
match res {
|
||||
Ok(IOResult::Done(header_schema_cookie)) => {
|
||||
if header_schema_cookie != *schema_cookie {
|
||||
@@ -10224,6 +10222,21 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn get_schema_cookie(
|
||||
pager: &Arc<Pager>,
|
||||
mv_store: Option<&Arc<MvStore>>,
|
||||
program: &Program,
|
||||
) -> Result<IOResult<u32>> {
|
||||
if let Some(mv_store) = mv_store {
|
||||
let tx_id = program.connection.get_mv_tx_id();
|
||||
mv_store
|
||||
.with_header(|header| header.schema_cookie.get(), tx_id.as_ref())
|
||||
.map(IOResult::Done)
|
||||
} else {
|
||||
pager.get_schema_cookie()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
Reference in New Issue
Block a user