mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-25 12:04:21 +01:00
Merge 'Fix non-4096 page sizes' from Jussi Saurio
Closes #2555 ## Problem The main problem we had with the current implementation of `init_pager()` was that the WAL header was eagerly allocated and written to disk with a page size, and a potential already-set page size on an initialized database was not checked. Given this scenario: - Initialized database with e.g. page size 512 but no WAL - Tursodb connects to DB It would not check the database file for the page size and instead would initialize the WAL header with the default 4096 page size, plus initialize the `BufferPool` similarly with the wrong size, and then panic when reading pages from the DB, expecting to read `4096` instead of `512`, as demonstrated in the reproduction of #2555. ## Fix 1. Add `Database::read_page_size_from_db_header()` method that can be used in the above cases 2. Initialize the WAL header lazily during the first frame append, using the existing `WalFile::ensure_header_if_needed()` method, removing the need to eagerly pass `page_size` when constructing the in-memory `WalFileShared` structure. ## Reader notes This PR follows a fairly logical commit-by-commit structure so you'll preferably want to read it that way. Reviewed-by: Nikita Sivukhin (@sivukhin) Closes #2569
This commit is contained in:
@@ -473,6 +473,9 @@ impl DatabaseFile {
|
||||
}
|
||||
|
||||
impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
fn read_header(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
|
||||
self.file.pread(0, c)
|
||||
}
|
||||
fn read_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
|
||||
106
core/lib.rs
106
core/lib.rs
@@ -388,11 +388,7 @@ impl Database {
|
||||
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
|
||||
let pager = self.init_pager(None)?;
|
||||
|
||||
let page_size = pager
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.page_size))
|
||||
.unwrap_or_default()
|
||||
.get();
|
||||
let page_size = pager.page_size.get().expect("page size not set");
|
||||
|
||||
let default_cache_size = pager
|
||||
.io
|
||||
@@ -437,16 +433,72 @@ impl Database {
|
||||
self.open_flags.contains(OpenFlags::ReadOnly)
|
||||
}
|
||||
|
||||
fn init_pager(&self, page_size: Option<usize>) -> Result<Pager> {
|
||||
/// If we do not have a physical WAL file, but we know the database file is initialized on disk,
|
||||
/// we need to read the page_size from the database header.
|
||||
fn read_page_size_from_db_header(&self) -> Result<PageSize> {
|
||||
turso_assert!(
|
||||
self.db_state.is_initialized(),
|
||||
"read_page_size_from_db_header called on uninitialized database"
|
||||
);
|
||||
turso_assert!(
|
||||
PageSize::MIN % 512 == 0,
|
||||
"header read must be a multiple of 512 for O_DIRECT"
|
||||
);
|
||||
let buf = Arc::new(Buffer::new_temporary(PageSize::MIN as usize));
|
||||
let c = Completion::new_read(buf.clone(), move |_buf, _| {});
|
||||
let c = self.db_file.read_header(c)?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
let page_size = u16::from_be_bytes(buf.as_slice()[16..18].try_into().unwrap());
|
||||
let page_size = PageSize::new_from_header_u16(page_size)?;
|
||||
Ok(page_size)
|
||||
}
|
||||
|
||||
/// Read the page size in order of preference:
|
||||
/// 1. From the WAL header if it exists and is initialized
|
||||
/// 2. From the database header if the database is initialized
|
||||
///
|
||||
/// Otherwise, fall back to, in order of preference:
|
||||
/// 1. From the requested page size if it is provided
|
||||
/// 2. PageSize::default(), i.e. 4096
|
||||
fn determine_actual_page_size(
|
||||
&self,
|
||||
maybe_shared_wal: Option<&WalFileShared>,
|
||||
requested_page_size: Option<usize>,
|
||||
) -> Result<PageSize> {
|
||||
if let Some(shared_wal) = maybe_shared_wal {
|
||||
let size_in_wal = shared_wal.page_size();
|
||||
if size_in_wal != 0 {
|
||||
let Some(page_size) = PageSize::new(size_in_wal) else {
|
||||
bail_corrupt_error!("invalid page size in WAL: {size_in_wal}");
|
||||
};
|
||||
return Ok(page_size);
|
||||
}
|
||||
}
|
||||
if self.db_state.is_initialized() {
|
||||
Ok(self.read_page_size_from_db_header()?)
|
||||
} else {
|
||||
let Some(size) = requested_page_size else {
|
||||
return Ok(PageSize::default());
|
||||
};
|
||||
let Some(page_size) = PageSize::new(size as u32) else {
|
||||
bail_corrupt_error!("invalid requested page size: {size}");
|
||||
};
|
||||
Ok(page_size)
|
||||
}
|
||||
}
|
||||
|
||||
fn init_pager(&self, requested_page_size: Option<usize>) -> Result<Pager> {
|
||||
// Open existing WAL file if present
|
||||
let mut maybe_shared_wal = self.maybe_shared_wal.write();
|
||||
if let Some(shared_wal) = maybe_shared_wal.clone() {
|
||||
let size = match page_size {
|
||||
None => unsafe { (*shared_wal.get()).page_size() as usize },
|
||||
Some(size) => size,
|
||||
};
|
||||
let page_size = self.determine_actual_page_size(
|
||||
Some(unsafe { &*shared_wal.get() }),
|
||||
requested_page_size,
|
||||
)?;
|
||||
let buffer_pool = self.buffer_pool.clone();
|
||||
buffer_pool.finalize_with_page_size(size)?;
|
||||
if self.db_state.is_initialized() {
|
||||
buffer_pool.finalize_with_page_size(page_size.get() as usize)?;
|
||||
}
|
||||
|
||||
let db_state = self.db_state.clone();
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
@@ -463,10 +515,17 @@ impl Database {
|
||||
db_state,
|
||||
self.init_lock.clone(),
|
||||
)?;
|
||||
pager.page_size.set(Some(page_size));
|
||||
return Ok(pager);
|
||||
}
|
||||
let buffer_pool = self.buffer_pool.clone();
|
||||
|
||||
let page_size = self.determine_actual_page_size(None, requested_page_size)?;
|
||||
|
||||
if self.db_state.is_initialized() {
|
||||
buffer_pool.finalize_with_page_size(page_size.get() as usize)?;
|
||||
}
|
||||
|
||||
// No existing WAL; create one.
|
||||
let db_state = self.db_state.clone();
|
||||
let mut pager = Pager::new(
|
||||
@@ -479,21 +538,10 @@ impl Database {
|
||||
Arc::new(Mutex::new(())),
|
||||
)?;
|
||||
|
||||
let size = match page_size {
|
||||
Some(size) => size as u32,
|
||||
None => {
|
||||
pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.page_size))
|
||||
.unwrap_or_default()
|
||||
.get()
|
||||
}
|
||||
};
|
||||
|
||||
pager.page_size.set(Some(size));
|
||||
pager.page_size.set(Some(page_size));
|
||||
let wal_path = format!("{}-wal", self.path);
|
||||
let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?;
|
||||
let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?;
|
||||
let real_shared_wal = WalFileShared::new_shared(file)?;
|
||||
// Modify Database::maybe_shared_wal to point to the new WAL file so that other connections
|
||||
// can open the existing WAL.
|
||||
*maybe_shared_wal = Some(real_shared_wal.clone());
|
||||
@@ -783,7 +831,7 @@ pub struct Connection {
|
||||
cache_size: Cell<i32>,
|
||||
/// page size used for an uninitialized database or the next vacuum command.
|
||||
/// it's not always equal to the current page size of the database
|
||||
page_size: Cell<u32>,
|
||||
page_size: Cell<PageSize>,
|
||||
/// Disable automatic checkpoint behaviour when DB is shutted down or WAL reach certain size
|
||||
/// Client still can manually execute PRAGMA wal_checkpoint(...) commands
|
||||
wal_auto_checkpoint_disabled: Cell<bool>,
|
||||
@@ -1438,7 +1486,7 @@ impl Connection {
|
||||
pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) {
|
||||
self.capture_data_changes.replace(opts);
|
||||
}
|
||||
pub fn get_page_size(&self) -> u32 {
|
||||
pub fn get_page_size(&self) -> PageSize {
|
||||
self.page_size.get()
|
||||
}
|
||||
|
||||
@@ -1476,9 +1524,9 @@ impl Connection {
|
||||
/// is first created, if it does not already exist when the page_size pragma is issued,
|
||||
/// or at the next VACUUM command that is run on the same database connection while not in WAL mode.
|
||||
pub fn reset_page_size(&self, size: u32) -> Result<()> {
|
||||
if PageSize::new(size).is_none() {
|
||||
let Some(size) = PageSize::new(size) else {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
self.page_size.set(size);
|
||||
if self._db.db_state.get() != DbState::Uninitialized {
|
||||
@@ -1487,7 +1535,7 @@ impl Connection {
|
||||
|
||||
*self._db.maybe_shared_wal.write() = None;
|
||||
self.pager.borrow_mut().clear_page_cache();
|
||||
let pager = self._db.init_pager(Some(size as usize))?;
|
||||
let pager = self._db.init_pager(Some(size.get() as usize))?;
|
||||
self.pager.replace(Rc::new(pager));
|
||||
self.pager.borrow().set_initial_page_size(size);
|
||||
|
||||
|
||||
@@ -5261,7 +5261,13 @@ impl BTreeCursor {
|
||||
|
||||
fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option<ImmutableRecord>> {
|
||||
if self.reusable_immutable_record.borrow().is_none() {
|
||||
let record = ImmutableRecord::new(4096);
|
||||
let page_size = self
|
||||
.pager
|
||||
.page_size
|
||||
.get()
|
||||
.expect("page size is not set")
|
||||
.get();
|
||||
let record = ImmutableRecord::new(page_size as usize);
|
||||
self.reusable_immutable_record.replace(Some(record));
|
||||
}
|
||||
self.reusable_immutable_record.borrow_mut()
|
||||
@@ -8463,7 +8469,7 @@ mod tests {
|
||||
));
|
||||
|
||||
let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap();
|
||||
let wal_shared = WalFileShared::new_shared(page_size as u32, &io, wal_file).unwrap();
|
||||
let wal_shared = WalFileShared::new_shared(wal_file).unwrap();
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
io.clone(),
|
||||
wal_shared,
|
||||
|
||||
@@ -9,6 +9,7 @@ use tracing::{instrument, Level};
|
||||
/// the storage medium. A database can either be a file on disk, like in SQLite,
|
||||
/// or something like a remote page server service.
|
||||
pub trait DatabaseStorage: Send + Sync {
|
||||
fn read_header(&self, c: Completion) -> Result<Completion>;
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Completion>;
|
||||
fn write_page(&self, page_idx: usize, buffer: Arc<Buffer>, c: Completion)
|
||||
-> Result<Completion>;
|
||||
@@ -37,6 +38,10 @@ unsafe impl Sync for DatabaseFile {}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
impl DatabaseStorage for DatabaseFile {
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn read_header(&self, c: Completion) -> Result<Completion> {
|
||||
self.file.pread(0, c)
|
||||
}
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
|
||||
@@ -424,7 +424,7 @@ pub struct Pager {
|
||||
/// Cache page_size and reserved_space at Pager init and reuse for subsequent
|
||||
/// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality
|
||||
/// to change it.
|
||||
pub(crate) page_size: Cell<Option<u32>>,
|
||||
pub(crate) page_size: Cell<Option<PageSize>>,
|
||||
reserved_space: OnceCell<u8>,
|
||||
free_page_state: RefCell<FreePageState>,
|
||||
/// Maximum number of pages allowed in the database. Default is 1073741823 (SQLite default).
|
||||
@@ -888,7 +888,6 @@ impl Pager {
|
||||
self.io
|
||||
.block(|| self.with_header(|header| header.page_size))
|
||||
.unwrap_or_default()
|
||||
.get()
|
||||
});
|
||||
|
||||
let reserved_space = *self.reserved_space.get_or_init(|| {
|
||||
@@ -897,11 +896,11 @@ impl Pager {
|
||||
.unwrap_or_default()
|
||||
});
|
||||
|
||||
(page_size as usize) - (reserved_space as usize)
|
||||
(page_size.get() as usize) - (reserved_space as usize)
|
||||
}
|
||||
|
||||
/// Set the initial page size for the database. Should only be called before the database is initialized
|
||||
pub fn set_initial_page_size(&self, size: u32) {
|
||||
pub fn set_initial_page_size(&self, size: PageSize) {
|
||||
assert_eq!(self.db_state.get(), DbState::Uninitialized);
|
||||
self.page_size.replace(Some(size));
|
||||
}
|
||||
@@ -1148,7 +1147,11 @@ impl Pager {
|
||||
);
|
||||
page
|
||||
};
|
||||
let c = wal.borrow_mut().append_frame(page.clone(), 0)?;
|
||||
let c = wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
self.page_size.get().expect("page size not set"),
|
||||
0,
|
||||
)?;
|
||||
// TODO: invalidade previous completions if this one fails
|
||||
completions.push(c);
|
||||
}
|
||||
@@ -1208,7 +1211,11 @@ impl Pager {
|
||||
};
|
||||
|
||||
// TODO: invalidade previous completions on error here
|
||||
let c = wal.borrow_mut().append_frame(page.clone(), db_size)?;
|
||||
let c = wal.borrow_mut().append_frame(
|
||||
page.clone(),
|
||||
self.page_size.get().expect("page size not set"),
|
||||
db_size,
|
||||
)?;
|
||||
completions.push(c);
|
||||
}
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
@@ -1390,8 +1397,8 @@ impl Pager {
|
||||
.io
|
||||
.block(|| self.with_header(|header| header.database_size))?
|
||||
.get();
|
||||
let page_size = self.page_size.get().unwrap_or(PageSize::DEFAULT as u32);
|
||||
let expected = (db_size * page_size) as u64;
|
||||
let page_size = self.page_size.get().unwrap_or_default();
|
||||
let expected = (db_size * page_size.get()) as u64;
|
||||
if expected < self.db_file.size()? {
|
||||
self.io.wait_for_completion(self.db_file.truncate(
|
||||
expected as usize,
|
||||
@@ -1559,7 +1566,7 @@ impl Pager {
|
||||
default_header.database_size = 1.into();
|
||||
|
||||
if let Some(size) = self.page_size.get() {
|
||||
default_header.page_size = PageSize::new(size).expect("page size");
|
||||
default_header.page_size = size;
|
||||
}
|
||||
self.buffer_pool
|
||||
.finalize_with_page_size(default_header.page_size.get() as usize)?;
|
||||
@@ -2215,8 +2222,6 @@ mod ptrmap_tests {
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
io.clone(),
|
||||
WalFileShared::new_shared(
|
||||
page_size,
|
||||
&io,
|
||||
io.open_file("test.db-wal", OpenFlags::Create, false)
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -62,7 +62,7 @@ use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::pager::Pager;
|
||||
use crate::storage::wal::{PendingFlush, READMARK_NOT_USED};
|
||||
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
|
||||
use crate::{turso_assert, File, Result, WalFileShared};
|
||||
use crate::{bail_corrupt_error, turso_assert, File, Result, WalFileShared};
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::mem::MaybeUninit;
|
||||
@@ -89,6 +89,7 @@ impl PageSize {
|
||||
pub const MAX: u32 = 65536;
|
||||
pub const DEFAULT: u16 = 4096;
|
||||
|
||||
/// Interpret a user-provided u32 as either a valid page size or None.
|
||||
pub const fn new(size: u32) -> Option<Self> {
|
||||
if size < PageSize::MIN || size > PageSize::MAX {
|
||||
return None;
|
||||
@@ -100,12 +101,28 @@ impl PageSize {
|
||||
}
|
||||
|
||||
if size == PageSize::MAX {
|
||||
// Internally, the value 1 represents 65536, since the on-disk value of the page size in the DB header is 2 bytes.
|
||||
return Some(Self(U16BE::new(1)));
|
||||
}
|
||||
|
||||
Some(Self(U16BE::new(size as u16)))
|
||||
}
|
||||
|
||||
/// Interpret a u16 on disk (DB file header) as either a valid page size or
|
||||
/// return a corrupt error.
|
||||
pub fn new_from_header_u16(value: u16) -> Result<Self> {
|
||||
match value {
|
||||
1 => Ok(Self(U16BE::new(1))),
|
||||
n => {
|
||||
let Some(size) = Self::new(n as u32) else {
|
||||
bail_corrupt_error!("invalid page size in database header: {n}");
|
||||
};
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub const fn get(self) -> u32 {
|
||||
match self.0.get() {
|
||||
1 => Self::MAX,
|
||||
@@ -947,7 +964,7 @@ pub fn write_pages_vectored(
|
||||
// batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's
|
||||
// to submit as `writev`/write_pages calls.
|
||||
|
||||
let page_sz = pager.page_size.get().unwrap_or(PageSize::DEFAULT as u32) as usize;
|
||||
let page_sz = pager.page_size.get().expect("page size is not set").get() as usize;
|
||||
|
||||
// Count expected number of runs to create the atomic counter we need to track each batch
|
||||
let mut run_count = 0;
|
||||
|
||||
@@ -16,10 +16,10 @@ use crate::io::{File, IO};
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame,
|
||||
write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
write_pages_vectored, PageSize, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
};
|
||||
use crate::types::{IOCompletions, IOResult};
|
||||
use crate::{turso_assert, Buffer, LimboError, Result};
|
||||
use crate::{bail_corrupt_error, turso_assert, Buffer, LimboError, Result};
|
||||
use crate::{Completion, Page};
|
||||
|
||||
use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
|
||||
@@ -257,7 +257,12 @@ pub trait Wal {
|
||||
/// db_size > 0 -> last frame written in transaction
|
||||
/// db_size == 0 -> non-last frame written in transaction
|
||||
/// write_counter is the counter we use to track when the I/O operation starts and completes
|
||||
fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result<Completion>;
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
page_size: PageSize,
|
||||
db_size: u32,
|
||||
) -> Result<Completion>;
|
||||
|
||||
/// Complete append of frames by updating shared wal state. Before this
|
||||
/// all changes were stored locally.
|
||||
@@ -951,9 +956,10 @@ impl Wal for WalFile {
|
||||
db_size: u64,
|
||||
page: &[u8],
|
||||
) -> Result<()> {
|
||||
if self.get_max_frame_in_wal() == 0 {
|
||||
self.ensure_header_if_needed()?;
|
||||
}
|
||||
let Some(page_size) = PageSize::new(page.len() as u32) else {
|
||||
bail_corrupt_error!("invalid page size: {}", page.len());
|
||||
};
|
||||
self.ensure_header_if_needed(page_size)?;
|
||||
tracing::debug!("write_raw_frame({})", frame_id);
|
||||
if page.len() != self.page_size() as usize {
|
||||
return Err(LimboError::InvalidArgument(format!(
|
||||
@@ -1031,11 +1037,20 @@ impl Wal for WalFile {
|
||||
|
||||
/// Write a frame to the WAL.
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result<Completion> {
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
page_size: PageSize,
|
||||
db_size: u32,
|
||||
) -> Result<Completion> {
|
||||
self.ensure_header_if_needed(page_size)?;
|
||||
let shared = self.get_shared();
|
||||
if shared.max_frame.load(Ordering::Acquire).eq(&0) {
|
||||
self.ensure_header_if_needed()?;
|
||||
}
|
||||
let shared_page_size = shared.wal_header.lock().page_size;
|
||||
turso_assert!(
|
||||
shared_page_size == page_size.get(),
|
||||
"page size mismatch - tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}",
|
||||
page_size.get()
|
||||
);
|
||||
let page_id = page.get().id;
|
||||
let frame_id = self.max_frame + 1;
|
||||
let offset = self.frame_offset(frame_id);
|
||||
@@ -1283,20 +1298,25 @@ impl WalFile {
|
||||
|
||||
/// the WAL file has been truncated and we are writing the first
|
||||
/// frame since then. We need to ensure that the header is initialized.
|
||||
fn ensure_header_if_needed(&mut self) -> Result<()> {
|
||||
fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> {
|
||||
if self.get_shared().is_initialized()? {
|
||||
return Ok(());
|
||||
}
|
||||
tracing::debug!("ensure_header_if_needed");
|
||||
self.last_checksum = {
|
||||
let shared = self.get_shared();
|
||||
if shared.max_frame.load(Ordering::Acquire) != 0 {
|
||||
return Ok(());
|
||||
}
|
||||
if shared.file.size()? >= WAL_HEADER_SIZE as u64 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut hdr = shared.wal_header.lock();
|
||||
hdr.magic = if cfg!(target_endian = "big") {
|
||||
WAL_MAGIC_BE
|
||||
} else {
|
||||
WAL_MAGIC_LE
|
||||
};
|
||||
if hdr.page_size == 0 {
|
||||
hdr.page_size = self.page_size();
|
||||
hdr.page_size = page_size.get();
|
||||
}
|
||||
if hdr.salt_1 == 0 && hdr.salt_2 == 0 {
|
||||
hdr.salt_1 = self.io.generate_random_number() as u32;
|
||||
hdr.salt_2 = self.io.generate_random_number() as u32;
|
||||
}
|
||||
|
||||
// recompute header checksum
|
||||
@@ -1729,47 +1749,26 @@ impl WalFileShared {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_shared(
|
||||
page_size: u32,
|
||||
io: &Arc<dyn IO>,
|
||||
file: Arc<dyn File>,
|
||||
) -> Result<Arc<UnsafeCell<WalFileShared>>> {
|
||||
pub fn is_initialized(&self) -> Result<bool> {
|
||||
Ok(self.file.size()? >= WAL_HEADER_SIZE as u64)
|
||||
}
|
||||
|
||||
pub fn new_shared(file: Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFileShared>>> {
|
||||
let magic = if cfg!(target_endian = "big") {
|
||||
WAL_MAGIC_BE
|
||||
} else {
|
||||
WAL_MAGIC_LE
|
||||
};
|
||||
let mut wal_header = WalHeader {
|
||||
let wal_header = WalHeader {
|
||||
magic,
|
||||
file_format: 3007000,
|
||||
page_size,
|
||||
page_size: 0, // Signifies WAL header that is not persistent on disk yet.
|
||||
checkpoint_seq: 0, // TODO implement sequence number
|
||||
salt_1: io.generate_random_number() as u32,
|
||||
salt_2: io.generate_random_number() as u32,
|
||||
salt_1: 0,
|
||||
salt_2: 0,
|
||||
checksum_1: 0,
|
||||
checksum_2: 0,
|
||||
};
|
||||
let native = cfg!(target_endian = "big"); // if target_endian is
|
||||
// already big then we don't care but if isn't, header hasn't yet been
|
||||
// encoded to big endian, therefore we want to swap bytes to compute this
|
||||
// checksum.
|
||||
let checksums = (0, 0);
|
||||
let checksums = checksum_wal(
|
||||
&wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
|
||||
&wal_header,
|
||||
checksums,
|
||||
native, // this is false because we haven't encoded the wal header yet
|
||||
);
|
||||
wal_header.checksum_1 = checksums.0;
|
||||
wal_header.checksum_2 = checksums.1;
|
||||
let c = sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
|
||||
let header = Arc::new(SpinLock::new(wal_header));
|
||||
let checksum = {
|
||||
let checksum = header.lock();
|
||||
(checksum.checksum_1, checksum.checksum_2)
|
||||
};
|
||||
io.wait_for_completion(c)?;
|
||||
tracing::debug!("new_shared(header={:?})", header);
|
||||
let read_locks = array::from_fn(|_| TursoRwLock::new());
|
||||
// slot zero is always zero as it signifies that reads can be done from the db file
|
||||
// directly, and slot 1 is the default read mark containing the max frame. in this case
|
||||
@@ -1785,7 +1784,7 @@ impl WalFileShared {
|
||||
max_frame: AtomicU64::new(0),
|
||||
nbackfills: AtomicU64::new(0),
|
||||
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
|
||||
last_checksum: checksum,
|
||||
last_checksum: (0, 0),
|
||||
file,
|
||||
pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
|
||||
read_locks,
|
||||
|
||||
@@ -484,7 +484,7 @@ fn query_pragma(
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.page_size.get()))
|
||||
.unwrap_or(connection.get_page_size()) as i64,
|
||||
.unwrap_or(connection.get_page_size().get()) as i64,
|
||||
register,
|
||||
);
|
||||
program.emit_result_row(register, 1);
|
||||
|
||||
@@ -6785,8 +6785,9 @@ pub fn op_open_ephemeral(
|
||||
let page_size = pager
|
||||
.io
|
||||
.block(|| pager.with_header(|header| header.page_size))
|
||||
.unwrap_or_default()
|
||||
.get() as usize;
|
||||
.unwrap_or_default();
|
||||
|
||||
pager.page_size.set(Some(page_size));
|
||||
|
||||
state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager };
|
||||
}
|
||||
|
||||
@@ -57,3 +57,110 @@ fn test_pragma_module_list_generate_series() {
|
||||
|
||||
assert!(found, "generate_series should appear in module_list");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pragma_page_sizes_without_writes_persists() {
|
||||
for test_page_size in [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] {
|
||||
let db = TempDatabase::new_empty(false);
|
||||
{
|
||||
let conn = db.connect_limbo();
|
||||
let pragma_query = format!("PRAGMA page_size={test_page_size}");
|
||||
conn.execute(&pragma_query).unwrap();
|
||||
conn.execute("PRAGMA user_version=1").unwrap(); // even sqlite behavior is that just changing page_size pragma doesn't persist it, so we do this to make a minimal persistent change
|
||||
}
|
||||
|
||||
let conn = db.connect_limbo();
|
||||
let mut rows = conn.query("PRAGMA page_size").unwrap().unwrap();
|
||||
let StepResult::Row = rows.step().unwrap() else {
|
||||
panic!("expected row");
|
||||
};
|
||||
let row = rows.row().unwrap();
|
||||
let Value::Integer(page_size) = row.get_value(0) else {
|
||||
panic!("expected integer value");
|
||||
};
|
||||
assert_eq!(*page_size, test_page_size);
|
||||
|
||||
// Reopen database and verify page size
|
||||
let db = TempDatabase::new_with_existent(&db.path, false);
|
||||
let conn = db.connect_limbo();
|
||||
let mut rows = conn.query("PRAGMA page_size").unwrap().unwrap();
|
||||
let StepResult::Row = rows.step().unwrap() else {
|
||||
panic!("expected row");
|
||||
};
|
||||
let row = rows.row().unwrap();
|
||||
let Value::Integer(page_size) = row.get_value(0) else {
|
||||
panic!("expected integer value");
|
||||
};
|
||||
assert_eq!(*page_size, test_page_size);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pragma_page_sizes_with_writes_persists() {
|
||||
for test_page_size in [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] {
|
||||
let db = TempDatabase::new_empty(false);
|
||||
{
|
||||
{
|
||||
let conn = db.connect_limbo();
|
||||
let pragma_query = format!("PRAGMA page_size={test_page_size}");
|
||||
conn.execute(&pragma_query).unwrap();
|
||||
|
||||
// Create table and insert data
|
||||
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
|
||||
.unwrap();
|
||||
conn.execute("INSERT INTO test (id, value) VALUES (1, 'test data')")
|
||||
.unwrap();
|
||||
// Insert a big blob just as a small smoke test that our btree handles this well with different page sizes.
|
||||
conn.execute("INSERT INTO test (id, value) VALUES (2, randomblob(1024*1024))")
|
||||
.unwrap();
|
||||
let mut page_size = conn.pragma_query("page_size").unwrap();
|
||||
let mut page_size = page_size.pop().unwrap();
|
||||
let page_size = page_size.pop().unwrap();
|
||||
let Value::Integer(page_size) = page_size else {
|
||||
panic!("expected integer value");
|
||||
};
|
||||
assert_eq!(page_size, test_page_size);
|
||||
} // Connection is dropped here
|
||||
|
||||
// Reopen database and verify page size and data
|
||||
let conn = db.connect_limbo();
|
||||
|
||||
// Check page size is still test_page_size
|
||||
let mut page_size = conn.pragma_query("page_size").unwrap();
|
||||
let mut page_size = page_size.pop().unwrap();
|
||||
let page_size = page_size.pop().unwrap();
|
||||
let Value::Integer(page_size) = page_size else {
|
||||
panic!("expected integer value");
|
||||
};
|
||||
assert_eq!(page_size, test_page_size);
|
||||
|
||||
// Verify data can still be read
|
||||
let mut rows = conn
|
||||
.query("SELECT value FROM test WHERE id = 1")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
loop {
|
||||
if let StepResult::Row = rows.step().unwrap() {
|
||||
let row = rows.row().unwrap();
|
||||
let Value::Text(value) = row.get_value(0) else {
|
||||
panic!("expected text value");
|
||||
};
|
||||
assert_eq!(value.as_str(), "test data");
|
||||
break;
|
||||
}
|
||||
rows.run_once().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the db and reopen it, and verify the same
|
||||
let db = TempDatabase::new_with_existent(&db.path, false);
|
||||
let conn = db.connect_limbo();
|
||||
let mut page_size = conn.pragma_query("page_size").unwrap();
|
||||
let mut page_size = page_size.pop().unwrap();
|
||||
let page_size = page_size.pop().unwrap();
|
||||
let Value::Integer(page_size) = page_size else {
|
||||
panic!("expected integer value");
|
||||
};
|
||||
assert_eq!(page_size, test_page_size);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user