Merge 'Direct DatabaseHeader reads and writes – with_header and with_header_mut' from Levy A.

This PR introduces two methods to pager. Very much inspired by
`with_schema` and `with_schema_mut`. `Pager::with_header` and
`Pager::with_header_mut` will give to the closure a shared and unique
reference respectively that are transmuted references from the `PageRef`
buffer.
This PR also adds type-safe wrappers for `Version`, `PageSize`,
`CacheSize` and `TextEncoding`, as they have special in-memory
representations.
Writing the `DatabaseHeader` is just a single `memcpy` now.
```rs
pub fn write_database_header(&self, header: &DatabaseHeader) {
    let buf = self.as_ptr();
    buf[0..DatabaseHeader::SIZE].copy_from_slice(bytemuck::bytes_of(header));
}
```
`HeaderRef` and `HeaderRefMut` are used in the `with_header*` methods,
but also can be used on its own when there are multiple reads and writes
to the header, where putting everything in a closure would add too much
nesting.

Reviewed-by: Preston Thorpe (@PThorpe92)

Closes #2234
This commit is contained in:
Jussi Saurio
2025-07-31 10:02:47 +03:00
11 changed files with 601 additions and 604 deletions

29
Cargo.lock generated
View File

@@ -382,9 +382,23 @@ checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytemuck"
version = "1.22.0"
version = "1.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540"
checksum = "5c76a5792e44e4abe34d3abf15636779261d45a7450612059293d1d2cfc63422"
dependencies = [
"bytemuck_derive",
]
[[package]]
name = "bytemuck_derive"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ecc273b49b3205b83d648f0690daa588925572cc5063745bfe547fe7ec8e1a1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "byteorder"
@@ -2676,6 +2690,15 @@ version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1036865bb9422d3300cf723f657c2851d0e9ab12567854b1f4eba3d77decf564"
[[package]]
name = "pack1"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6e7cd9bd638dc2c831519a0caa1c006cab771a92b1303403a8322773c5b72d6"
dependencies = [
"bytemuck",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
@@ -4219,6 +4242,7 @@ dependencies = [
"antithesis_sdk",
"bitflags 2.9.0",
"built",
"bytemuck",
"cfg_block",
"chrono",
"criterion",
@@ -4236,6 +4260,7 @@ dependencies = [
"memory-stats",
"miette",
"mimalloc",
"pack1",
"parking_lot",
"paste",
"polling",

View File

@@ -71,6 +71,8 @@ serde = { workspace = true, optional = true, features = ["derive"] }
paste = "1.0.15"
uuid = { version = "1.11.0", features = ["v4", "v7"], optional = true }
tempfile = "3.8.0"
pack1 = { version = "1.0.0", features = ["bytemuck"] }
bytemuck = "1.23.1"
[build-dependencies]
chrono = { version = "0.4.38", default-features = false }

View File

@@ -41,15 +41,13 @@ mod numeric;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use crate::storage::header_accessor::get_schema_cookie;
use crate::storage::sqlite3_ondisk::is_valid_page_size;
use crate::storage::{header_accessor, wal::DummyWAL};
use crate::storage::wal::DummyWAL;
use crate::translate::optimizer::optimize_plan;
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
#[cfg(feature = "fs")]
use crate::types::WalInsertInfo;
#[cfg(feature = "fs")]
use crate::util::{IOExt, OpenMode, OpenOptions};
use crate::util::{OpenMode, OpenOptions};
use crate::vtab::VirtualTable;
use core::str;
pub use error::LimboError;
@@ -80,6 +78,7 @@ use std::{
use storage::database::DatabaseFile;
use storage::page_cache::DumbLruPageCache;
use storage::pager::{AtomicDbState, DbState};
use storage::sqlite3_ondisk::PageSize;
pub use storage::{
buffer_pool::BufferPool,
database::DatabaseStorage,
@@ -93,7 +92,7 @@ use turso_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
use types::IOResult;
pub use types::RefValue;
pub use types::Value;
use util::parse_schema_rows;
use util::{parse_schema_rows, IOExt as _};
use vdbe::builder::QueryMode;
use vdbe::builder::TableRefIdCounter;
@@ -333,10 +332,17 @@ impl Database {
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
let pager = self.init_pager(None)?;
let page_size = header_accessor::get_page_size(&pager)
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE);
let default_cache_size = header_accessor::get_default_page_cache_size(&pager)
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE);
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get();
let default_cache_size = pager
.io
.block(|| pager.with_header(|header| header.default_page_cache_size))
.unwrap_or_default()
.get();
let conn = Arc::new(Connection {
_db: self.clone(),
@@ -419,8 +425,11 @@ impl Database {
let size = match page_size {
Some(size) => size as u32,
None => {
let size = header_accessor::get_page_size(&pager)
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE);
let size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get();
buffer_pool.set_page_size(size as usize);
size
}
@@ -807,10 +816,12 @@ impl Connection {
// first, quickly read schema_version from the root page in order to check if schema changed
pager.begin_read_tx()?;
let db_schema_version = get_schema_cookie(&pager);
let db_schema_version = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie));
pager.end_read_tx().expect("read txn must be finished");
let db_schema_version = db_schema_version?;
let db_schema_version = db_schema_version?.get();
let conn_schema_version = self.schema.borrow().schema_version;
turso_assert!(
conn_schema_version <= db_schema_version,
@@ -838,7 +849,10 @@ impl Connection {
let mut fresh = Schema::new(false); // todo: indices!
// read cookie before consuming statement program - otherwise we can end up reading cookie with closed transaction state
let cookie = get_schema_cookie(&pager)?;
let cookie = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie))?
.get();
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, &mut fresh, &self.syms.borrow(), None)?;
@@ -1315,7 +1329,7 @@ impl Connection {
/// is first created, if it does not already exist when the page_size pragma is issued,
/// or at the next VACUUM command that is run on the same database connection while not in WAL mode.
pub fn reset_page_size(&self, size: u32) -> Result<()> {
if !is_valid_page_size(size) {
if PageSize::new(size).is_none() {
return Ok(());
}

View File

@@ -4,11 +4,10 @@ use tracing::{instrument, Level};
use crate::{
schema::Index,
storage::{
header_accessor,
pager::{BtreePageAllocMode, Pager},
sqlite3_ondisk::{
read_u32, read_varint, BTreeCell, PageContent, PageType, TableInteriorCell,
TableLeafCell, CELL_PTR_SIZE_BYTES, INTERIOR_PAGE_HEADER_SIZE_BYTES,
read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, INTERIOR_PAGE_HEADER_SIZE_BYTES,
LEAF_PAGE_HEADER_SIZE_BYTES, LEFT_CHILD_PTR_SIZE_BYTES,
},
},
@@ -18,6 +17,7 @@ use crate::{
find_compare, get_tie_breaker_from_seek_op, IndexInfo, ParseRecordState, RecordCompare,
RecordCursor, SeekResult,
},
util::IOExt,
Completion, MvCursor,
};
@@ -30,8 +30,7 @@ use crate::{
use super::{
pager::PageRef,
sqlite3_ondisk::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE,
MINIMUM_CELL_SIZE,
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE,
},
};
#[cfg(debug_assertions)]
@@ -3334,7 +3333,11 @@ impl BTreeCursor {
"left pointer is not the same as page id"
);
// FIXME: remove this lock
let database_size = header_accessor::get_database_size(&self.pager)?;
let database_size = self
.pager
.io
.block(|| self.pager.with_header(|header| header.database_size))?
.get();
turso_assert!(
left_pointer <= database_size,
"invalid page number divider left pointer {} > database number of pages {}",
@@ -3493,7 +3496,7 @@ impl BTreeCursor {
// sub-algorithm in some documentation.
assert!(sibling_count_new == 1);
let parent_offset = if parent_page.get().id == 1 {
DATABASE_HEADER_SIZE
DatabaseHeader::SIZE
} else {
0
};
@@ -4044,7 +4047,7 @@ impl BTreeCursor {
current_root.get().get().id == 1
};
let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 };
let offset = if is_page_1 { DatabaseHeader::SIZE } else { 0 };
let root_btree = self.stack.top();
let root = root_btree.get();
@@ -4935,7 +4938,11 @@ impl BTreeCursor {
OverflowState::ProcessPage { next_page } => {
if next_page < 2
|| next_page as usize
> header_accessor::get_database_size(&self.pager)? as usize
> self
.pager
.io
.block(|| self.pager.with_header(|header| header.database_size))?
.get() as usize
{
self.overflow_state = None;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
@@ -6995,9 +7002,9 @@ mod tests {
database::DatabaseFile,
page_cache::DumbLruPageCache,
pager::{AtomicDbState, DbState},
sqlite3_ondisk::PageSize,
},
types::Text,
util::IOExt as _,
vdbe::Register,
BufferPool, Completion, Connection, StepResult, WalFile, WalFileShared,
};
@@ -7364,7 +7371,10 @@ mod tests {
// Create cursor for the table
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
let initial_pagecount = header_accessor::get_database_size(&pager).unwrap();
let initial_pagecount = pager
.io
.block(|| pager.with_header(|header| header.database_size.get()))
.unwrap();
assert_eq!(
initial_pagecount, 2,
"Page count should be 2 after initial insert, was {initial_pagecount}"
@@ -7385,12 +7395,18 @@ mod tests {
// Verify that overflow pages were created by checking freelist count
// The freelist count should be 0 initially, and after inserting a large record,
// some pages should be allocated for overflow, but they won't be in freelist yet
let freelist_after_insert = header_accessor::get_freelist_pages(&pager).unwrap();
let freelist_after_insert = pager
.io
.block(|| pager.with_header(|header| header.freelist_pages.get()))
.unwrap();
assert_eq!(
freelist_after_insert, 0,
"Freelist count should be 0 after insert, was {freelist_after_insert}"
);
let pagecount_after_insert = header_accessor::get_database_size(&pager).unwrap();
let pagecount_after_insert = pager
.io
.block(|| pager.with_header(|header| header.database_size.get()))
.unwrap();
const EXPECTED_OVERFLOW_PAGES: u32 = 3;
assert_eq!(
pagecount_after_insert,
@@ -7419,7 +7435,10 @@ mod tests {
run_until_done(|| cursor.insert(&key, true), pager.deref()).unwrap();
// Check that the freelist count has increased, indicating overflow pages were cleared
let freelist_after_overwrite = header_accessor::get_freelist_pages(&pager).unwrap();
let freelist_after_overwrite = pager
.io
.block(|| pager.with_header(|header| header.freelist_pages.get()))
.unwrap();
assert_eq!(freelist_after_overwrite, EXPECTED_OVERFLOW_PAGES, "Freelist count should be {EXPECTED_OVERFLOW_PAGES} after overwrite, was {freelist_after_overwrite}");
// Verify the record was actually overwritten by reading it back
@@ -8285,7 +8304,12 @@ mod tests {
let res = pager.allocate_page().unwrap();
}
header_accessor::set_page_size(&pager, page_size).unwrap();
pager
.io
.block(|| {
pager.with_header_mut(|header| header.page_size = PageSize::new(page_size).unwrap())
})
.unwrap();
pager
}
@@ -8309,7 +8333,10 @@ mod tests {
let drop_fn = Rc::new(|_buf| {});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(
header_accessor::get_page_size(&pager)? as usize,
pager
.io
.block(|| pager.with_header(|header| header.page_size))?
.get() as usize,
drop_fn,
)));
let c = Completion::new_write(|_| {});
@@ -8351,20 +8378,35 @@ mod tests {
payload_size: large_payload.len() as u64,
});
let initial_freelist_pages = header_accessor::get_freelist_pages(&pager)?;
let initial_freelist_pages = pager
.io
.block(|| pager.with_header(|header| header.freelist_pages))?
.get();
// Clear overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
IOResult::Done(_) => {
let (freelist_pages, freelist_trunk_page) = pager
.io
.block(|| {
pager.with_header(|header| {
(
header.freelist_pages.get(),
header.freelist_trunk_page.get(),
)
})
})
.unwrap();
// Verify proper number of pages were added to freelist
assert_eq!(
header_accessor::get_freelist_pages(&pager)?,
freelist_pages,
initial_freelist_pages + 3,
"Expected 3 pages to be added to freelist"
);
// If this is first trunk page
let trunk_page_id = header_accessor::get_freelist_trunk_page(&pager)?;
let trunk_page_id = freelist_trunk_page;
if trunk_page_id > 0 {
// Verify trunk page structure
let (trunk_page, c) = cursor.read_page(trunk_page_id as usize)?;
@@ -8408,23 +8450,33 @@ mod tests {
payload_size: small_payload.len() as u64,
});
let initial_freelist_pages = header_accessor::get_freelist_pages(&pager)?;
let initial_freelist_pages = pager
.io
.block(|| pager.with_header(|header| header.freelist_pages))?
.get() as usize;
// Try to clear non-existent overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
IOResult::Done(_) => {
let (freelist_pages, freelist_trunk_page) = pager.io.block(|| {
pager.with_header(|header| {
(
header.freelist_pages.get(),
header.freelist_trunk_page.get(),
)
})
})?;
// Verify freelist was not modified
assert_eq!(
header_accessor::get_freelist_pages(&pager)?,
initial_freelist_pages,
freelist_pages as usize, initial_freelist_pages,
"Freelist should not change when no overflow pages exist"
);
// Verify trunk page wasn't created
assert_eq!(
header_accessor::get_freelist_trunk_page(&pager)?,
0,
freelist_trunk_page, 0,
"No trunk page should be created when no overflow pages exist"
);
}
@@ -8504,18 +8556,28 @@ mod tests {
// Verify structure before destruction
assert_eq!(
header_accessor::get_database_size(&pager)?,
pager
.io
.block(|| pager.with_header(|header| header.database_size))?
.get(),
4, // We should have pages 1-4
"Database should have 4 pages total"
);
// Track freelist state before destruction
let initial_free_pages = header_accessor::get_freelist_pages(&pager)?;
let initial_free_pages = pager
.io
.block(|| pager.with_header(|header| header.freelist_pages))?
.get();
assert_eq!(initial_free_pages, 0, "should start with no free pages");
run_until_done(|| cursor.btree_destroy(), pager.deref())?;
let pages_freed = header_accessor::get_freelist_pages(&pager)? - initial_free_pages;
let pages_freed = pager
.io
.block(|| pager.with_header(|header| header.freelist_pages))?
.get()
- initial_free_pages;
assert_eq!(pages_freed, 3, "should free 3 pages (root + 2 leaves)");
Ok(())

View File

@@ -3,18 +3,18 @@ use parking_lot::Mutex;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::sqlite3_ondisk::PageSize;
pub struct BufferPool {
pub free_buffers: Mutex<Vec<BufferData>>,
page_size: AtomicUsize,
}
const DEFAULT_PAGE_SIZE: usize = 4096;
impl BufferPool {
pub fn new(page_size: Option<usize>) -> Self {
Self {
free_buffers: Mutex::new(Vec::new()),
page_size: AtomicUsize::new(page_size.unwrap_or(DEFAULT_PAGE_SIZE)),
page_size: AtomicUsize::new(page_size.unwrap_or(PageSize::DEFAULT as usize)),
}
}

View File

@@ -1,230 +1,75 @@
use crate::storage::sqlite3_ondisk::MAX_PAGE_SIZE;
use super::sqlite3_ondisk::{DatabaseHeader, PageContent};
use crate::turso_assert;
use crate::{
storage::{
self,
pager::{PageRef, Pager},
sqlite3_ondisk::DATABASE_HEADER_PAGE_ID,
},
storage::pager::{PageRef, Pager},
types::IOResult,
LimboError, Result,
};
use std::cell::{Ref, RefMut};
// const HEADER_OFFSET_MAGIC: usize = 0;
const HEADER_OFFSET_PAGE_SIZE: usize = 16;
const HEADER_OFFSET_WRITE_VERSION: usize = 18;
const HEADER_OFFSET_READ_VERSION: usize = 19;
const HEADER_OFFSET_RESERVED_SPACE: usize = 20;
const HEADER_OFFSET_MAX_EMBED_FRAC: usize = 21;
const HEADER_OFFSET_MIN_EMBED_FRAC: usize = 22;
const HEADER_OFFSET_MIN_LEAF_FRAC: usize = 23;
const HEADER_OFFSET_CHANGE_COUNTER: usize = 24;
const HEADER_OFFSET_DATABASE_SIZE: usize = 28;
const HEADER_OFFSET_FREELIST_TRUNK_PAGE: usize = 32;
const HEADER_OFFSET_FREELIST_PAGES: usize = 36;
const HEADER_OFFSET_SCHEMA_COOKIE: usize = 40;
const HEADER_OFFSET_SCHEMA_FORMAT: usize = 44;
const HEADER_OFFSET_DEFAULT_PAGE_CACHE_SIZE: usize = 48;
const HEADER_OFFSET_VACUUM_MODE_LARGEST_ROOT_PAGE: usize = 52;
const HEADER_OFFSET_TEXT_ENCODING: usize = 56;
const HEADER_OFFSET_USER_VERSION: usize = 60;
const HEADER_OFFSET_INCREMENTAL_VACUUM_ENABLED: usize = 64;
const HEADER_OFFSET_APPLICATION_ID: usize = 68;
//const HEADER_OFFSET_RESERVED_FOR_EXPANSION: usize = 72;
const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92;
const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
pub struct HeaderRef(PageRef);
// Helper to get a read-only reference to the header page.
fn get_header_page(pager: &Pager) -> Result<IOResult<PageRef>> {
if !pager.db_state.is_initialized() {
return Err(LimboError::InternalError(
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
));
}
let (page, c) = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
if page.is_locked() {
return Ok(IOResult::IO);
}
Ok(IOResult::Done(page))
}
// Helper to get a writable reference to the header page and mark it dirty.
fn get_header_page_for_write(pager: &Pager) -> Result<IOResult<PageRef>> {
if !pager.db_state.is_initialized() {
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
return Err(LimboError::InternalError(
"Cannot write to header of an empty database - page 1 should've been allocated before this".to_string(),
));
}
let (page, c) = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
if page.is_locked() {
return Ok(IOResult::IO);
}
turso_assert!(
page.get().id == DATABASE_HEADER_PAGE_ID,
"page must have number 1"
);
pager.add_dirty(&page);
Ok(IOResult::Done(page))
}
/// Helper function to run async header accessors until completion
fn run_header_accessor_until_done<T, F>(pager: &Pager, mut accessor: F) -> Result<T>
where
F: FnMut() -> Result<IOResult<T>>,
{
loop {
match accessor()? {
IOResult::Done(value) => return Ok(value),
IOResult::IO => {
pager.io.run_once()?;
}
impl HeaderRef {
pub fn from_pager(pager: &Pager) -> Result<IOResult<Self>> {
if !pager.db_state.is_initialized() {
return Err(LimboError::InternalError(
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string()
));
}
}
}
/// Helper macro to implement getters and setters for header fields.
/// For example, `impl_header_field_accessor!(page_size, u16, HEADER_OFFSET_PAGE_SIZE);`
/// will generate the following functions:
/// - `pub fn get_page_size(pager: &Pager) -> Result<u16>` (sync)
/// - `pub fn get_page_size_async(pager: &Pager) -> Result<CursorResult<u16>>` (async)
/// - `pub fn set_page_size(pager: &Pager, value: u16) -> Result<()>` (sync)
/// - `pub fn set_page_size_async(pager: &Pager, value: u16) -> Result<CursorResult<()>>` (async)
///
/// The macro takes three required arguments:
/// - `$field_name`: The name of the field to implement.
/// - `$type`: The type of the field.
/// - `$offset`: The offset of the field in the header page.
///
/// And a fourth optional argument:
/// - `$ifzero`: A value to return if the field is 0.
///
/// The macro will generate both sync and async versions of the functions.
///
macro_rules! impl_header_field_accessor {
($field_name:ident, $type:ty, $offset:expr $(, $ifzero:expr)?) => {
paste::paste! {
// Async version
#[allow(dead_code)]
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<IOResult<$type>> {
if !pager.db_state.is_initialized() {
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
}
let page = match get_header_page(pager)? {
IOResult::Done(page) => page,
IOResult::IO => return Ok(IOResult::IO),
};
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
let buf = page_content.buffer.borrow();
let buf_slice = buf.as_slice();
let mut bytes = [0; std::mem::size_of::<$type>()];
bytes.copy_from_slice(&buf_slice[$offset..$offset + std::mem::size_of::<$type>()]);
let value = <$type>::from_be_bytes(bytes);
$(
if value == 0 {
return Ok(IOResult::Done($ifzero));
}
)?
Ok(IOResult::Done(value))
}
// Sync version
#[allow(dead_code)]
pub fn [<get_ $field_name>](pager: &Pager) -> Result<$type> {
run_header_accessor_until_done(pager, || [<get_ $field_name _async>](pager))
}
// Async setter
#[allow(dead_code)]
pub fn [<set_ $field_name _async>](pager: &Pager, value: $type) -> Result<IOResult<()>> {
let page = match get_header_page_for_write(pager)? {
IOResult::Done(page) => page,
IOResult::IO => return Ok(IOResult::IO),
};
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
let mut buf = page_content.buffer.borrow_mut();
let buf_slice = buf.as_mut_slice();
buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes());
turso_assert!(page.get().id == 1, "page must have number 1");
pager.add_dirty(&page);
Ok(IOResult::Done(()))
}
// Sync setter
#[allow(dead_code)]
pub fn [<set_ $field_name>](pager: &Pager, value: $type) -> Result<()> {
run_header_accessor_until_done(pager, || [<set_ $field_name _async>](pager, value))
}
let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
if page.is_locked() {
return Ok(IOResult::IO);
}
};
}
// impl_header_field_accessor!(magic, [u8; 16], HEADER_OFFSET_MAGIC);
impl_header_field_accessor!(page_size_u16, u16, HEADER_OFFSET_PAGE_SIZE);
impl_header_field_accessor!(write_version, u8, HEADER_OFFSET_WRITE_VERSION);
impl_header_field_accessor!(read_version, u8, HEADER_OFFSET_READ_VERSION);
impl_header_field_accessor!(reserved_space, u8, HEADER_OFFSET_RESERVED_SPACE);
impl_header_field_accessor!(max_embed_frac, u8, HEADER_OFFSET_MAX_EMBED_FRAC);
impl_header_field_accessor!(min_embed_frac, u8, HEADER_OFFSET_MIN_EMBED_FRAC);
impl_header_field_accessor!(min_leaf_frac, u8, HEADER_OFFSET_MIN_LEAF_FRAC);
impl_header_field_accessor!(change_counter, u32, HEADER_OFFSET_CHANGE_COUNTER);
impl_header_field_accessor!(database_size, u32, HEADER_OFFSET_DATABASE_SIZE);
impl_header_field_accessor!(freelist_trunk_page, u32, HEADER_OFFSET_FREELIST_TRUNK_PAGE);
impl_header_field_accessor!(freelist_pages, u32, HEADER_OFFSET_FREELIST_PAGES);
impl_header_field_accessor!(schema_cookie, u32, HEADER_OFFSET_SCHEMA_COOKIE);
impl_header_field_accessor!(schema_format, u32, HEADER_OFFSET_SCHEMA_FORMAT);
impl_header_field_accessor!(
default_page_cache_size,
i32,
HEADER_OFFSET_DEFAULT_PAGE_CACHE_SIZE,
storage::sqlite3_ondisk::DEFAULT_CACHE_SIZE
);
impl_header_field_accessor!(
vacuum_mode_largest_root_page,
u32,
HEADER_OFFSET_VACUUM_MODE_LARGEST_ROOT_PAGE
);
impl_header_field_accessor!(text_encoding, u32, HEADER_OFFSET_TEXT_ENCODING);
impl_header_field_accessor!(user_version, i32, HEADER_OFFSET_USER_VERSION);
impl_header_field_accessor!(
incremental_vacuum_enabled,
u32,
HEADER_OFFSET_INCREMENTAL_VACUUM_ENABLED
);
impl_header_field_accessor!(application_id, i32, HEADER_OFFSET_APPLICATION_ID);
//impl_header_field_accessor!(reserved_for_expansion, [u8; 20], HEADER_OFFSET_RESERVED_FOR_EXPANSION);
impl_header_field_accessor!(version_valid_for, u32, HEADER_OFFSET_VERSION_VALID_FOR);
impl_header_field_accessor!(version_number, u32, HEADER_OFFSET_VERSION_NUMBER);
turso_assert!(
page.get().id == DatabaseHeader::PAGE_ID,
"incorrect header page id"
);
pub fn get_page_size(pager: &Pager) -> Result<u32> {
let size = get_page_size_u16(pager)?;
if size == 1 {
return Ok(MAX_PAGE_SIZE);
Ok(IOResult::Done(Self(page)))
}
pub fn borrow(&self) -> Ref<'_, DatabaseHeader> {
// TODO: Instead of erasing mutability, implement `get_mut_contents` and return a shared reference.
let content: &PageContent = self.0.get_contents();
Ref::map(content.buffer.borrow(), |buffer| {
bytemuck::from_bytes::<DatabaseHeader>(&buffer.as_slice()[0..DatabaseHeader::SIZE])
})
}
Ok(size as u32)
}
#[allow(dead_code)]
pub fn set_page_size(pager: &Pager, value: u32) -> Result<()> {
let page_size = if value == MAX_PAGE_SIZE {
1
} else {
value as u16
};
set_page_size_u16(pager, page_size)
}
pub struct HeaderRefMut(PageRef);
#[allow(dead_code)]
pub fn get_page_size_async(pager: &Pager) -> Result<IOResult<u32>> {
match get_page_size_u16_async(pager)? {
IOResult::Done(size) => {
if size == 1 {
return Ok(IOResult::Done(MAX_PAGE_SIZE));
}
Ok(IOResult::Done(size as u32))
impl HeaderRefMut {
pub fn from_pager(pager: &Pager) -> Result<IOResult<Self>> {
if !pager.db_state.is_initialized() {
return Err(LimboError::InternalError(
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
));
}
IOResult::IO => Ok(IOResult::IO),
let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
if page.is_locked() {
return Ok(IOResult::IO);
}
turso_assert!(
page.get().id == DatabaseHeader::PAGE_ID,
"incorrect header page id"
);
pager.add_dirty(&page);
Ok(IOResult::Done(Self(page)))
}
pub fn borrow_mut(&self) -> RefMut<'_, DatabaseHeader> {
let content = self.0.get_contents();
RefMut::map(content.buffer.borrow_mut(), |buffer| {
bytemuck::from_bytes_mut::<DatabaseHeader>(
&mut buffer.as_mut_slice()[0..DatabaseHeader::SIZE],
)
})
}
}

View File

@@ -2,9 +2,8 @@ use crate::result::LimboResult;
use crate::storage::btree::BTreePageInner;
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::header_accessor;
use crate::storage::sqlite3_ondisk::{
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType, DEFAULT_PAGE_SIZE,
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType,
};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::{IOResult, WalInsertInfo};
@@ -21,8 +20,9 @@ use std::sync::{Arc, Mutex};
use tracing::{instrument, trace, Level};
use super::btree::{btree_init_page, BTreePage};
use super::header_accessor::{HeaderRef, HeaderRefMut};
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
use super::sqlite3_ondisk::{begin_write_btree_page, DATABASE_HEADER_SIZE};
use super::sqlite3_ondisk::begin_write_btree_page;
use super::wal::CheckpointMode;
#[cfg(not(feature = "omit_autovacuum"))]
@@ -473,10 +473,8 @@ impl Pager {
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<IOResult<Option<PtrmapEntry>>> {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size = match header_accessor::get_page_size_async(self)? {
IOResult::Done(size) => size as usize,
IOResult::IO => return Ok(IOResult::IO),
};
let configured_page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
@@ -559,10 +557,7 @@ impl Pager {
parent_page_no
);
let page_size = match header_accessor::get_page_size_async(self)? {
IOResult::Done(size) => size as usize,
IOResult::IO => return Ok(IOResult::IO),
};
let page_size = return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(db_page_no_to_update, page_size)
@@ -658,21 +653,19 @@ impl Pager {
Ok(IOResult::Done(page.get().get().id as u32))
}
AutoVacuumMode::Full => {
let mut root_page_num =
match header_accessor::get_vacuum_mode_largest_root_page_async(self)? {
IOResult::Done(value) => value,
IOResult::IO => return Ok(IOResult::IO),
};
let (mut root_page_num, page_size) =
return_if_io!(self.with_header(|header| {
(
header.vacuum_mode_largest_root_page.get(),
header.page_size.get(),
)
}));
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
root_page_num += 1;
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
let page_size = match header_accessor::get_page_size_async(self)? {
IOResult::Done(size) => size as usize,
IOResult::IO => return Ok(IOResult::IO),
};
while is_ptrmap_page(root_page_num, page_size) {
while is_ptrmap_page(root_page_num, page_size as usize) {
root_page_num += 1;
}
assert!(root_page_num >= 3); // the very first root page is page 3
@@ -745,14 +738,18 @@ impl Pager {
/// The usable size of a page might be an odd number. However, the usable size is not allowed to be less than 480.
/// In other words, if the page size is 512, then the reserved space size cannot exceed 32.
pub fn usable_space(&self) -> usize {
let page_size = *self
.page_size
.get()
.get_or_insert_with(|| header_accessor::get_page_size(self).unwrap());
let page_size = *self.page_size.get().get_or_insert_with(|| {
self.io
.block(|| self.with_header(|header| header.page_size))
.unwrap_or_default()
.get()
});
let reserved_space = *self
.reserved_space
.get_or_init(|| header_accessor::get_reserved_space(self).unwrap());
let reserved_space = *self.reserved_space.get_or_init(|| {
self.io
.block(|| self.with_header(|header| header.reserved_space))
.unwrap_or_default()
});
(page_size as usize) - (reserved_space as usize)
}
@@ -1080,7 +1077,10 @@ impl Pager {
};
let db_size = {
let db_size = header_accessor::get_database_size(self)?;
let db_size = self
.io
.block(|| self.with_header(|header| header.database_size))?
.get();
if is_last_frame {
db_size
} else {
@@ -1313,8 +1313,11 @@ impl Pager {
if checkpoint_result.everything_backfilled()
&& checkpoint_result.num_checkpointed_frames != 0
{
let db_size = header_accessor::get_database_size(self)?;
let page_size = self.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE);
let db_size = self
.io
.block(|| self.with_header(|header| header.database_size))?
.get();
let page_size = self.page_size.get().unwrap_or(PageSize::DEFAULT as u32);
let expected = (db_size * page_size) as u64;
if expected < self.db_file.size()? {
self.io.wait_for_completion(self.db_file.truncate(
@@ -1354,12 +1357,15 @@ impl Pager {
const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer
const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
let header_ref = self.io.block(|| HeaderRefMut::from_pager(self))?;
let mut header = header_ref.borrow_mut();
let mut state = self.free_page_state.borrow_mut();
tracing::debug!(?state);
loop {
match &mut *state {
FreePageState::Start => {
if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize {
if page_id < 2 || page_id > header.database_size.get() as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {page_id} for free operation"
)));
@@ -1385,12 +1391,9 @@ impl Pager {
(page, Some(c))
}
};
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? + 1,
)?;
header.freelist_pages = (header.freelist_pages.get() + 1).into();
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let trunk_page_id = header.freelist_trunk_page.get();
if trunk_page_id != 0 {
*state = FreePageState::AddToTrunk {
@@ -1402,7 +1405,7 @@ impl Pager {
}
}
FreePageState::AddToTrunk { page, trunk_page } => {
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let trunk_page_id = header.freelist_trunk_page.get();
if trunk_page.is_none() {
// Add as leaf to current trunk
let (page, c) = self.read_page(trunk_page_id as usize)?;
@@ -1419,7 +1422,7 @@ impl Pager {
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries =
(self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
(header.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
if number_of_leaf_pages < max_free_list_entries as u32 {
turso_assert!(
@@ -1449,7 +1452,7 @@ impl Pager {
turso_assert!(page.get().id == page_id, "page has unexpected id");
self.add_dirty(page);
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let trunk_page_id = header.freelist_trunk_page.get();
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
@@ -1457,7 +1460,7 @@ impl Pager {
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
header_accessor::set_freelist_trunk_page(self, page_id as u32)?;
header.freelist_trunk_page = (page_id as u32).into();
// Clear flags
page.clear_uptodate();
break;
@@ -1476,9 +1479,12 @@ impl Pager {
tracing::trace!("allocate_page1(Start)");
self.db_state.set(DbState::Initializing);
let mut default_header = DatabaseHeader::default();
default_header.database_size += 1;
assert_eq!(default_header.database_size.get(), 0);
default_header.database_size = 1.into();
if let Some(size) = self.page_size.get() {
default_header.update_page_size(size);
default_header.page_size = PageSize::new(size).expect("page size");
}
let page = allocate_new_page(1, &self.buffer_pool, 0);
@@ -1495,8 +1501,8 @@ impl Pager {
btree_init_page(
&page1,
PageType::TableLeaf,
DATABASE_HEADER_SIZE,
(default_header.get_page_size() - default_header.reserved_space as u32) as u16,
DatabaseHeader::SIZE,
(default_header.page_size.get() - default_header.reserved_space as u32) as u16,
);
let write_counter = Rc::new(RefCell::new(0));
let c = begin_write_btree_page(self, &page1.get(), write_counter.clone())?;
@@ -1553,12 +1559,15 @@ impl Pager {
const FREELIST_TRUNK_OFFSET_LEAF_COUNT: usize = 4;
const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8;
let header_ref = self.io.block(|| HeaderRefMut::from_pager(self))?;
let mut header = header_ref.borrow_mut();
loop {
let mut state = self.allocate_page_state.borrow_mut();
tracing::debug!("allocate_page(state={:?})", state);
match &mut *state {
AllocatePageState::Start => {
let old_db_size = header_accessor::get_database_size(self)?;
let old_db_size = header.database_size.get();
#[cfg(not(feature = "omit_autovacuum"))]
let mut new_db_size = old_db_size;
#[cfg(feature = "omit_autovacuum")]
@@ -1571,10 +1580,7 @@ impl Pager {
// - autovacuum is enabled
// - the last page is a pointer map page
if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full)
&& is_ptrmap_page(
new_db_size + 1,
header_accessor::get_page_size(self)? as usize,
)
&& is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize)
{
// we will allocate a ptrmap page, so increment size
new_db_size += 1;
@@ -1595,8 +1601,7 @@ impl Pager {
}
}
let first_freelist_trunk_page_id =
header_accessor::get_freelist_trunk_page(self)?;
let first_freelist_trunk_page_id = header.freelist_trunk_page.get();
if first_freelist_trunk_page_id == 0 {
*state = AllocatePageState::AllocateNewPage {
current_db_size: new_db_size,
@@ -1649,11 +1654,8 @@ impl Pager {
// Freelist is not empty, so we can reuse the trunk itself as a new page
// and update the database's first freelist trunk page to the next trunk page.
header_accessor::set_freelist_trunk_page(self, next_trunk_page_id)?;
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? - 1,
)?;
header.freelist_trunk_page = next_trunk_page_id.into();
header.freelist_pages = (header.freelist_pages.get() + 1).into();
self.add_dirty(trunk_page);
// zero out the page
turso_assert!(
@@ -1736,11 +1738,7 @@ impl Pager {
);
self.add_dirty(trunk_page);
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? - 1,
)?;
header.freelist_pages = (header.freelist_pages.get() - 1).into();
*state = AllocatePageState::Start;
return Ok(IOResult::Done(leaf_page));
}
@@ -1766,7 +1764,7 @@ impl Pager {
Ok(_) => {}
};
}
header_accessor::set_database_size(self, new_db_size)?;
header.database_size = new_db_size.into();
*state = AllocatePageState::Start;
return Ok(IOResult::Done(page));
}
@@ -1796,12 +1794,6 @@ impl Pager {
Ok(())
}
pub fn usable_size(&self) -> usize {
let page_size = header_accessor::get_page_size(self).unwrap_or_default() as u32;
let reserved_space = header_accessor::get_reserved_space(self).unwrap_or_default() as u32;
(page_size - reserved_space) as usize
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn rollback(
&self,
@@ -1840,6 +1832,22 @@ impl Pager {
});
self.allocate_page_state.replace(AllocatePageState::Start);
}
pub fn with_header<T>(&self, f: impl Fn(&DatabaseHeader) -> T) -> Result<IOResult<T>> {
let IOResult::Done(header_ref) = HeaderRef::from_pager(self)? else {
return Ok(IOResult::IO);
};
let header = header_ref.borrow();
Ok(IOResult::Done(f(&header)))
}
pub fn with_header_mut<T>(&self, f: impl Fn(&mut DatabaseHeader) -> T) -> Result<IOResult<T>> {
let IOResult::Done(header_ref) = HeaderRefMut::from_pager(self)? else {
return Ok(IOResult::IO);
};
let mut header = header_ref.borrow_mut();
Ok(IOResult::Done(f(&mut header)))
}
}
pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
@@ -1917,7 +1925,7 @@ impl CreateBTreeFlags {
*/
#[cfg(not(feature = "omit_autovacuum"))]
mod ptrmap {
use crate::{storage::sqlite3_ondisk::MIN_PAGE_SIZE, LimboError, Result};
use crate::{storage::sqlite3_ondisk::PageSize, LimboError, Result};
// Constants
pub const PTRMAP_ENTRY_SIZE: usize = 5;
@@ -1985,14 +1993,14 @@ mod ptrmap {
/// Calculates how many database pages are mapped by a single pointer map page.
/// This is based on the total page size, as ptrmap pages are filled with entries.
pub fn entries_per_ptrmap_page(page_size: usize) -> usize {
assert!(page_size >= MIN_PAGE_SIZE as usize);
assert!(page_size >= PageSize::MIN as usize);
page_size / PTRMAP_ENTRY_SIZE
}
/// Calculates the cycle length of pointer map pages
/// The cycle length is the number of database pages that are mapped by a single pointer map page.
pub fn ptrmap_page_cycle_length(page_size: usize) -> usize {
assert!(page_size >= MIN_PAGE_SIZE as usize);
assert!(page_size >= PageSize::MIN as usize);
(page_size / PTRMAP_ENTRY_SIZE) + 1
}
@@ -2102,7 +2110,7 @@ mod ptrmap_tests {
use crate::storage::database::{DatabaseFile, DatabaseStorage};
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::MIN_PAGE_SIZE;
use crate::storage::sqlite3_ondisk::PageSize;
use crate::storage::wal::{WalFile, WalFileShared};
pub fn run_until_done<T>(
@@ -2154,7 +2162,12 @@ mod ptrmap_tests {
)
.unwrap();
run_until_done(|| pager.allocate_page1(), &pager).unwrap();
header_accessor::set_vacuum_mode_largest_root_page(&pager, 1).unwrap();
pager
.io
.block(|| {
pager.with_header_mut(|header| header.vacuum_mode_largest_root_page = 1.into())
})
.unwrap();
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
// Allocate all the pages as btree root pages
@@ -2194,7 +2207,11 @@ mod ptrmap_tests {
// Ensure that the database header size is correctly reflected
assert_eq!(
header_accessor::get_database_size(&pager).unwrap(),
pager
.io
.block(|| pager.with_header(|header| header.database_size))
.unwrap()
.get(),
initial_db_pages + 2
); // (1+1) -> (header + ptrmap)
@@ -2210,7 +2227,7 @@ mod ptrmap_tests {
#[test]
fn test_is_ptrmap_page_logic() {
let page_size = MIN_PAGE_SIZE as usize;
let page_size = PageSize::MIN as usize;
let n_data_pages = entries_per_ptrmap_page(page_size);
assert_eq!(n_data_pages, 102); // 512/5 = 102
@@ -2228,7 +2245,7 @@ mod ptrmap_tests {
#[test]
fn test_get_ptrmap_page_no() {
let page_size = MIN_PAGE_SIZE as usize; // Maps 103 data pages
let page_size = PageSize::MIN as usize; // Maps 103 data pages
// Test pages mapped by P0 (page 2)
assert_eq!(get_ptrmap_page_no_for_db_page(3, page_size), 2); // D(3) -> P0(2)
@@ -2248,7 +2265,7 @@ mod ptrmap_tests {
#[test]
fn test_get_ptrmap_offset() {
let page_size = MIN_PAGE_SIZE as usize; // Maps 103 data pages
let page_size = PageSize::MIN as usize; // Maps 103 data pages
assert_eq!(get_ptrmap_offset_in_page(3, 2, page_size).unwrap(), 0);
assert_eq!(

View File

@@ -43,6 +43,8 @@
#![allow(clippy::arc_with_non_send_sync)]
use bytemuck::{Pod, Zeroable};
use pack1::{I32BE, U16BE, U32BE};
use tracing::{instrument, Level};
use super::pager::PageRef;
@@ -69,26 +71,6 @@ use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
// DEFAULT_CACHE_SIZE negative values mean that we store the amount of pages a XKiB of memory can hold.
// We can calculate "real" cache size by diving by page size.
pub const DEFAULT_CACHE_SIZE: i32 = -2000;
// Minimum number of pages that cache can hold.
pub const MIN_PAGE_CACHE_SIZE: usize = 10;
/// The minimum page size in bytes.
pub const MIN_PAGE_SIZE: u32 = 512;
/// The maximum page size in bytes.
pub const MAX_PAGE_SIZE: u32 = 65536;
/// The default page size in bytes.
pub const DEFAULT_PAGE_SIZE: u32 = 4096;
pub const DATABASE_HEADER_PAGE_ID: usize = 1;
/// The minimum size of a cell in bytes.
pub const MINIMUM_CELL_SIZE: usize = 4;
@@ -97,116 +79,238 @@ pub const INTERIOR_PAGE_HEADER_SIZE_BYTES: usize = 12;
pub const LEAF_PAGE_HEADER_SIZE_BYTES: usize = 8;
pub const LEFT_CHILD_PTR_SIZE_BYTES: usize = 4;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum DatabaseEncoding {
Utf8 = 1,
Utf16Le = 2,
Utf16Be = 3,
}
#[derive(PartialEq, Eq, Zeroable, Pod, Clone, Copy, Debug)]
#[repr(transparent)]
/// Read/Write file format version.
pub struct PageSize(U16BE);
impl TryFrom<u32> for DatabaseEncoding {
type Error = LimboError;
impl PageSize {
pub const MIN: u32 = 512;
pub const MAX: u32 = 65536;
pub const DEFAULT: u16 = 4096;
fn try_from(value: u32) -> Result<Self> {
match value {
1 => Ok(Self::Utf8),
2 => Ok(Self::Utf16Le),
3 => Ok(Self::Utf16Be),
_ => Err(LimboError::Corrupt(format!("Invalid encoding: {value}"))),
pub const fn new(size: u32) -> Option<Self> {
if size < PageSize::MIN || size > PageSize::MAX {
return None;
}
// Page size must be a power of two.
if size.count_ones() != 1 {
return None;
}
if size == PageSize::MAX {
return Some(Self(U16BE::new(1)));
}
Some(Self(U16BE::new(size as u16)))
}
pub const fn get(self) -> u32 {
match self.0.get() {
1 => Self::MAX,
v => v as u32,
}
}
}
impl From<DatabaseEncoding> for &'static str {
fn from(encoding: DatabaseEncoding) -> Self {
match encoding {
DatabaseEncoding::Utf8 => "UTF-8",
DatabaseEncoding::Utf16Le => "UTF-16le",
DatabaseEncoding::Utf16Be => "UTF-16be",
impl Default for PageSize {
fn default() -> Self {
Self(U16BE::new(Self::DEFAULT))
}
}
#[derive(PartialEq, Eq, Zeroable, Pod, Clone, Copy, Debug)]
#[repr(transparent)]
/// Read/Write file format version.
pub struct CacheSize(I32BE);
impl CacheSize {
// The negative value means that we store the amount of pages a XKiB of memory can hold.
// We can calculate "real" cache size by diving by page size.
pub const DEFAULT: i32 = -2000;
// Minimum number of pages that cache can hold.
pub const MIN: i64 = 10;
// SQLite uses this value as threshold for maximum cache size
pub const MAX_SAFE: i64 = 2147450880;
pub const fn new(size: i32) -> Self {
match size {
Self::DEFAULT => Self(I32BE::new(0)),
v => Self(I32BE::new(v)),
}
}
pub const fn get(self) -> i32 {
match self.0.get() {
0 => Self::DEFAULT,
v => v,
}
}
}
/// The database header.
/// The first 100 bytes of the database file comprise the database file header.
/// The database file header is divided into fields as shown by the table below.
/// All multibyte fields in the database file header are stored with the most significant byte first (big-endian).
#[derive(Debug, Clone)]
impl Default for CacheSize {
fn default() -> Self {
Self(I32BE::new(Self::DEFAULT))
}
}
#[derive(PartialEq, Eq, Zeroable, Pod, Clone, Copy)]
#[repr(transparent)]
/// Read/Write file format version.
pub struct Version(u8);
impl Version {
#![allow(non_upper_case_globals)]
const Legacy: Self = Self(1);
const Wal: Self = Self(2);
}
impl std::fmt::Debug for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
Self::Legacy => f.write_str("Version::Legacy"),
Self::Wal => f.write_str("Version::Wal"),
Self(v) => write!(f, "Version::Invalid({v})"),
}
}
}
#[derive(PartialEq, Eq, Zeroable, Pod, Clone, Copy)]
#[repr(transparent)]
/// Text encoding.
pub struct TextEncoding(U32BE);
impl TextEncoding {
#![allow(non_upper_case_globals)]
pub const Utf8: Self = Self(U32BE::new(1));
pub const Utf16Le: Self = Self(U32BE::new(2));
pub const Utf16Be: Self = Self(U32BE::new(3));
}
impl std::fmt::Display for TextEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
Self::Utf8 => f.write_str("UTF-8"),
Self::Utf16Le => f.write_str("UTF-16le"),
Self::Utf16Be => f.write_str("UTF-16be"),
Self(v) => write!(f, "TextEncoding::Invalid({})", v.get()),
}
}
}
impl std::fmt::Debug for TextEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
Self::Utf8 => f.write_str("TextEncoding::Utf8"),
Self::Utf16Le => f.write_str("TextEncoding::Utf16Le"),
Self::Utf16Be => f.write_str("TextEncoding::Utf16Be"),
Self(v) => write!(f, "TextEncoding::Invalid({})", v.get()),
}
}
}
impl Default for TextEncoding {
fn default() -> Self {
Self::Utf8
}
}
#[derive(Pod, Zeroable, Clone, Copy, Debug)]
#[repr(C, packed)]
/// Database Header Format
pub struct DatabaseHeader {
/// The header string: "SQLite format 3\0"
/// b"SQLite format 3\0"
pub magic: [u8; 16],
/// The database page size in bytes. Must be a power of two between 512 and 32768 inclusive,
/// or the value 1 representing a page size of 65536.
pub page_size: u16,
/// Page size in bytes. Must be a power of two between 512 and 32768 inclusive, or the value 1 representing a page size of 65536.
pub page_size: PageSize,
/// File format write version. 1 for legacy; 2 for WAL.
pub write_version: u8,
pub write_version: Version,
/// File format read version. 1 for legacy; 2 for WAL.
pub read_version: u8,
pub read_version: Version,
/// Bytes of unused "reserved" space at the end of each page. Usually 0.
/// SQLite has the ability to set aside a small number of extra bytes at the end of every page for use by extensions.
/// These extra bytes are used, for example, by the SQLite Encryption Extension to store a nonce and/or
/// cryptographic checksum associated with each page.
pub reserved_space: u8,
/// Maximum embedded payload fraction. Must be 64.
pub max_embed_frac: u8,
/// Minimum embedded payload fraction. Must be 32.
pub min_embed_frac: u8,
/// Leaf payload fraction. Must be 32.
pub min_leaf_frac: u8,
/// File change counter, incremented when database is modified.
pub change_counter: u32,
pub leaf_frac: u8,
/// File change counter.
pub change_counter: U32BE,
/// Size of the database file in pages. The "in-header database size".
pub database_size: u32,
pub database_size: U32BE,
/// Page number of the first freelist trunk page.
pub freelist_trunk_page: u32,
pub freelist_trunk_page: U32BE,
/// Total number of freelist pages.
pub freelist_pages: u32,
/// The schema cookie. Incremented when the database schema changes.
pub schema_cookie: u32,
/// The schema format number. Supported formats are 1, 2, 3, and 4.
pub schema_format: u32,
pub freelist_pages: U32BE,
/// The schema cookie.
pub schema_cookie: U32BE,
/// The schema format number. Supported schema formats are 1, 2, 3, and 4.
pub schema_format: U32BE,
/// Default page cache size.
pub default_page_cache_size: i32,
/// The page number of the largest root b-tree page when in auto-vacuum or
/// incremental-vacuum modes, or zero otherwise.
pub vacuum_mode_largest_root_page: u32,
/// The database text encoding. 1=UTF-8, 2=UTF-16le, 3=UTF-16be.
pub text_encoding: u32,
pub default_page_cache_size: CacheSize,
/// The page number of the largest root b-tree page when in auto-vacuum or incremental-vacuum modes, or zero otherwise.
pub vacuum_mode_largest_root_page: U32BE,
/// Text encoding.
pub text_encoding: TextEncoding,
/// The "user version" as read and set by the user_version pragma.
pub user_version: i32,
pub user_version: I32BE,
/// True (non-zero) for incremental-vacuum mode. False (zero) otherwise.
pub incremental_vacuum_enabled: u32,
pub incremental_vacuum_enabled: U32BE,
/// The "Application ID" set by PRAGMA application_id.
pub application_id: u32,
pub application_id: I32BE,
/// Reserved for expansion. Must be zero.
pub reserved_for_expansion: [u8; 20],
_padding: [u8; 20],
/// The version-valid-for number.
pub version_valid_for: u32,
pub version_valid_for: U32BE,
/// SQLITE_VERSION_NUMBER
pub version_number: u32,
pub version_number: U32BE,
}
impl DatabaseHeader {
pub const PAGE_ID: usize = 1;
pub const SIZE: usize = size_of::<Self>();
const _CHECK: () = {
assert!(Self::SIZE == 100);
};
pub fn usable_space(self) -> usize {
(self.page_size.get() as usize) - (self.reserved_space as usize)
}
}
impl Default for DatabaseHeader {
fn default() -> Self {
Self {
magic: *b"SQLite format 3\0",
page_size: Default::default(),
write_version: Version::Wal,
read_version: Version::Wal,
reserved_space: 0,
max_embed_frac: 64,
min_embed_frac: 32,
leaf_frac: 32,
change_counter: U32BE::new(1),
database_size: U32BE::new(0),
freelist_trunk_page: U32BE::new(0),
freelist_pages: U32BE::new(0),
schema_cookie: U32BE::new(0),
schema_format: U32BE::new(4), // latest format, new sqlite3 databases use this format
default_page_cache_size: Default::default(),
vacuum_mode_largest_root_page: U32BE::new(0),
text_encoding: TextEncoding::Utf8,
user_version: I32BE::new(0),
incremental_vacuum_enabled: U32BE::new(0),
application_id: I32BE::new(0),
_padding: [0; 20],
version_valid_for: U32BE::new(3047000),
version_number: U32BE::new(3047000),
}
}
}
pub const WAL_HEADER_SIZE: usize = 32;
@@ -283,90 +387,6 @@ impl WalFrameHeader {
}
}
impl Default for DatabaseHeader {
fn default() -> Self {
Self {
magic: *b"SQLite format 3\0",
page_size: DEFAULT_PAGE_SIZE as u16,
write_version: 2,
read_version: 2,
reserved_space: 0,
max_embed_frac: 64,
min_embed_frac: 32,
min_leaf_frac: 32,
change_counter: 1,
database_size: 0,
freelist_trunk_page: 0,
freelist_pages: 0,
schema_cookie: 0,
schema_format: 4, // latest format, new sqlite3 databases use this format
default_page_cache_size: DEFAULT_CACHE_SIZE,
vacuum_mode_largest_root_page: 0,
text_encoding: 1, // utf-8
user_version: 0,
incremental_vacuum_enabled: 0,
application_id: 0,
reserved_for_expansion: [0; 20],
version_valid_for: 3047000,
version_number: 3047000,
}
}
}
impl DatabaseHeader {
pub fn update_page_size(&mut self, size: u32) {
if !is_valid_page_size(size) {
return;
}
self.page_size = if size == MAX_PAGE_SIZE {
1u16
} else {
size as u16
};
}
pub fn get_page_size(&self) -> u32 {
if self.page_size == 1 {
MAX_PAGE_SIZE
} else {
self.page_size as u32
}
}
}
pub fn is_valid_page_size(size: u32) -> bool {
(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&size) && (size & (size - 1)) == 0
}
pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;
buf[19] = header.read_version;
buf[20] = header.reserved_space;
buf[21] = header.max_embed_frac;
buf[22] = header.min_embed_frac;
buf[23] = header.min_leaf_frac;
buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes());
buf[28..32].copy_from_slice(&header.database_size.to_be_bytes());
buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes());
buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes());
buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes());
buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes());
buf[48..52].copy_from_slice(&header.default_page_cache_size.to_be_bytes());
buf[52..56].copy_from_slice(&header.vacuum_mode_largest_root_page.to_be_bytes());
buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes());
buf[60..64].copy_from_slice(&header.user_version.to_be_bytes());
buf[64..68].copy_from_slice(&header.incremental_vacuum_enabled.to_be_bytes());
buf[68..72].copy_from_slice(&header.application_id.to_be_bytes());
buf[72..92].copy_from_slice(&header.reserved_for_expansion);
buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes());
buf[96..100].copy_from_slice(&header.version_number.to_be_bytes());
}
#[repr(u8)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum PageType {
@@ -532,7 +552,7 @@ impl PageContent {
pub fn cell_content_area(&self) -> u32 {
let offset = self.read_u16(BTREE_CELL_CONTENT_AREA);
if offset == 0 {
MAX_PAGE_SIZE
PageSize::MAX
} else {
offset as u32
}
@@ -734,7 +754,7 @@ impl PageContent {
pub fn write_database_header(&self, header: &DatabaseHeader) {
let buf = self.as_ptr();
write_header_to_buf(buf, header);
buf[0..DatabaseHeader::SIZE].copy_from_slice(bytemuck::bytes_of(header));
}
pub fn debug_print_freelist(&self, usable_space: u16) {
@@ -794,8 +814,8 @@ pub fn finish_read_page(
page: PageRef,
) -> Result<()> {
tracing::trace!(page_idx);
let pos = if page_idx == DATABASE_HEADER_PAGE_ID {
DATABASE_HEADER_SIZE
let pos = if page_idx == DatabaseHeader::PAGE_ID {
DatabaseHeader::SIZE
} else {
0
};
@@ -1563,9 +1583,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2);
let page_size_u32 = header_locked.page_size;
if !(MIN_PAGE_SIZE..=MAX_PAGE_SIZE).contains(&page_size_u32)
|| page_size_u32.count_ones() != 1
{
if PageSize::new(page_size_u32).is_none() {
panic!("Invalid page size in WAL header: {page_size_u32}");
}
let page_size = page_size_u32 as usize;

View File

@@ -10,18 +10,17 @@ use turso_sqlite3_parser::ast::{PragmaName, QualifiedName};
use crate::pragma::pragma_for;
use crate::schema::Schema;
use crate::storage::pager::AutoVacuumMode;
use crate::storage::sqlite3_ondisk::{DatabaseEncoding, MIN_PAGE_CACHE_SIZE};
use crate::storage::sqlite3_ondisk::CacheSize;
use crate::storage::wal::CheckpointMode;
use crate::translate::schema::translate_create_table;
use crate::util::{normalize_ident, parse_signed_number, parse_string};
use crate::util::{normalize_ident, parse_signed_number, parse_string, IOExt as _};
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts};
use crate::vdbe::insn::{Cookie, Insn};
use crate::{bail_parse_error, storage, CaptureDataChangesMode, LimboError, Value};
use crate::{bail_parse_error, CaptureDataChangesMode, LimboError, Value};
use std::str::FromStr;
use strum::IntoEnumIterator;
use super::integrity_check::translate_integrity_check;
use crate::storage::header_accessor;
use crate::storage::pager::Pager;
use crate::translate::emitter::TransactionMode;
@@ -311,15 +310,12 @@ fn query_pragma(
Ok((program, TransactionMode::None))
}
PragmaName::Encoding => {
let encoding: &str = if !pager.db_state.is_initialized() {
DatabaseEncoding::Utf8
} else {
let encoding: DatabaseEncoding =
header_accessor::get_text_encoding(&pager)?.try_into()?;
encoding
}
.into();
program.emit_string8(encoding.into(), register);
let encoding = pager
.io
.block(|| pager.with_header(|header| header.text_encoding))
.unwrap_or_default()
.to_string();
program.emit_string8(encoding, register);
program.emit_result_row(register, 1);
program.add_pragma_result_column(pragma.to_string());
Ok((program, TransactionMode::None))
@@ -433,7 +429,10 @@ fn query_pragma(
}
PragmaName::PageSize => {
program.emit_int(
header_accessor::get_page_size(&pager).unwrap_or(connection.get_page_size()) as i64,
pager
.io
.block(|| pager.with_header(|header| header.page_size.get()))
.unwrap_or(connection.get_page_size()) as i64,
register,
);
program.emit_result_row(register, 1);
@@ -484,7 +483,11 @@ fn update_auto_vacuum_mode(
largest_root_page_number: u32,
pager: Rc<Pager>,
) -> crate::Result<()> {
header_accessor::set_vacuum_mode_largest_root_page(&pager, largest_root_page_number)?;
pager.io.block(|| {
pager.with_header_mut(|header| {
header.vacuum_mode_largest_root_page = largest_root_page_number.into()
})
})?;
pager.set_auto_vacuum_mode(auto_vacuum_mode);
Ok(())
}
@@ -498,8 +501,11 @@ fn update_cache_size(
let mut cache_size = if cache_size_unformatted < 0 {
let kb = cache_size_unformatted.abs().saturating_mul(1024);
let page_size = header_accessor::get_page_size(&pager)
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as i64;
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get() as i64;
if page_size == 0 {
return Err(LimboError::InternalError(
"Page size cannot be zero".to_string(),
@@ -510,10 +516,7 @@ fn update_cache_size(
value
};
// SQLite uses this value as threshold for maximum cache size
const MAX_SAFE_CACHE_SIZE: i64 = 2147450880;
if cache_size > MAX_SAFE_CACHE_SIZE {
if cache_size > CacheSize::MAX_SAFE {
cache_size = 0;
cache_size_unformatted = 0;
}
@@ -523,19 +526,17 @@ fn update_cache_size(
cache_size_unformatted = 0;
}
let cache_size_usize = cache_size as usize;
let final_cache_size = if cache_size_usize < MIN_PAGE_CACHE_SIZE {
cache_size_unformatted = MIN_PAGE_CACHE_SIZE as i64;
MIN_PAGE_CACHE_SIZE
let final_cache_size = if cache_size < CacheSize::MIN {
cache_size_unformatted = CacheSize::MIN;
CacheSize::MIN
} else {
cache_size_usize
cache_size
};
connection.set_cache_size(cache_size_unformatted as i32);
pager
.change_page_cache_size(final_cache_size)
.change_page_cache_size(final_cache_size as usize)
.map_err(|e| LimboError::InternalError(format!("Failed to update page cache size: {e}")))?;
Ok(())

View File

@@ -1,5 +1,4 @@
#![allow(unused)]
use crate::storage::header_accessor::get_schema_cookie;
use crate::translate::expr::WalkControl;
use crate::types::IOResult;
use crate::{

View File

@@ -7,13 +7,12 @@ use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
use crate::storage::sqlite3_ondisk::read_varint;
use crate::storage::wal::DummyWAL;
use crate::storage::{self, header_accessor};
use crate::translate::collate::CollationSeq;
use crate::types::{
compare_immutable, compare_records_generic, Extendable, ImmutableRecord, RawSlice, SeekResult,
Text, TextRef, TextSubtype,
};
use crate::util::normalize_ident;
use crate::util::{normalize_ident, IOExt as _};
use crate::vdbe::insn::InsertFlags;
use crate::vdbe::registers_to_ref_values;
use crate::vector::{vector_concat, vector_slice};
@@ -3590,8 +3589,12 @@ pub fn op_sorter_open(
};
let cache_size = program.connection.get_cache_size();
// Set the buffer size threshold to be roughly the same as the limit configured for the page-cache.
let page_size = header_accessor::get_page_size(pager)
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as usize;
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get() as usize;
let max_buffer_size_bytes = if cache_size < 0 {
(cache_size.abs() * 1024) as usize
} else {
@@ -4342,7 +4345,8 @@ pub fn op_function(
}
}
ScalarFunc::SqliteVersion => {
let version_integer: i64 = header_accessor::get_version_number(pager)? as i64;
let version_integer =
return_if_io!(pager.with_header(|header| header.version_number)).get() as i64;
let version = execute_sqlite_version(version_integer);
state.registers[*dest] = Register::Value(Value::build_text(version));
}
@@ -6011,8 +6015,12 @@ pub fn op_page_count(
// TODO: implement temp databases
todo!("temp databases not implemented yet");
}
let count = header_accessor::get_database_size(pager).unwrap_or(0);
state.registers[*dest] = Register::Value(Value::Integer(count as i64));
let count = match pager.with_header(|header| header.database_size.get()) {
Err(_) => 0.into(),
Ok(IOResult::Done(v)) => v.into(),
Ok(IOResult::IO) => return Ok(InsnFunctionStepResult::IO),
};
state.registers[*dest] = Register::Value(Value::Integer(count));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
@@ -6071,15 +6079,19 @@ pub fn op_read_cookie(
// TODO: implement temp databases
todo!("temp databases not implemented yet");
}
let cookie_value = match cookie {
Cookie::ApplicationId => header_accessor::get_application_id(pager).unwrap_or(0) as i64,
Cookie::UserVersion => header_accessor::get_user_version(pager).unwrap_or(0) as i64,
Cookie::SchemaVersion => header_accessor::get_schema_cookie(pager).unwrap_or(0) as i64,
Cookie::LargestRootPageNumber => {
header_accessor::get_vacuum_mode_largest_root_page(pager).unwrap_or(0) as i64
}
let cookie_value = match pager.with_header(|header| match cookie {
Cookie::ApplicationId => header.application_id.get().into(),
Cookie::UserVersion => header.user_version.get().into(),
Cookie::SchemaVersion => header.schema_cookie.get().into(),
Cookie::LargestRootPageNumber => header.vacuum_mode_largest_root_page.get().into(),
cookie => todo!("{cookie:?} is not yet implement for ReadCookie"),
}) {
Err(_) => 0.into(),
Ok(IOResult::Done(v)) => v,
Ok(IOResult::IO) => return Ok(InsnFunctionStepResult::IO),
};
state.registers[*dest] = Register::Value(Value::Integer(cookie_value));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
@@ -6104,38 +6116,38 @@ pub fn op_set_cookie(
if *db > 0 {
todo!("temp databases not implemented yet");
}
match cookie {
Cookie::ApplicationId => {
header_accessor::set_application_id(pager, *value)?;
}
Cookie::UserVersion => {
header_accessor::set_user_version(pager, *value)?;
}
Cookie::LargestRootPageNumber => {
header_accessor::set_vacuum_mode_largest_root_page(pager, *value as u32)?;
}
Cookie::IncrementalVacuum => {
header_accessor::set_incremental_vacuum_enabled(pager, *value as u32)?;
}
Cookie::SchemaVersion => {
if mv_store.is_none() {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
return_if_io!(pager.with_header_mut(|header| {
match cookie {
Cookie::ApplicationId => header.application_id = (*value).into(),
Cookie::UserVersion => header.user_version = (*value).into(),
Cookie::LargestRootPageNumber => {
header.vacuum_mode_largest_root_page = (*value as u32).into();
}
program
.connection
.with_schema_mut(|schema| schema.schema_version = *value as u32);
header_accessor::set_schema_cookie(pager, *value as u32)?;
}
cookie => todo!("{cookie:?} is not yet implement for SetCookie"),
}
Cookie::IncrementalVacuum => {
header.incremental_vacuum_enabled = (*value as u32).into()
}
Cookie::SchemaVersion => {
if mv_store.is_none() {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
}
program
.connection
.with_schema_mut(|schema| schema.schema_version = *value as u32);
header.schema_cookie = (*value as u32).into();
}
cookie => todo!("{cookie:?} is not yet implement for SetCookie"),
};
}));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
@@ -6342,9 +6354,11 @@ pub fn op_open_ephemeral(
Arc::new(Mutex::new(())),
)?);
let page_size = header_accessor::get_page_size(&pager)
.unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE)
as usize;
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get() as usize;
buffer_pool.set_page_size(page_size);
state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager };