diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 26af16cc4..5c06766cb 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -474,12 +474,6 @@ impl DatabaseFile { impl turso_core::DatabaseStorage for DatabaseFile { fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { - let r = c.as_read(); - let size = r.buf().len(); - assert!( - size == 100, - "the size of the database header must be 100 bytes, got {size}" - ); self.file.pread(0, c) } fn read_page( diff --git a/core/lib.rs b/core/lib.rs index c1c2d943a..5fbf8b751 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -388,10 +388,7 @@ impl Database { pub fn connect(self: &Arc) -> Result> { let pager = self.init_pager(None)?; - let page_size = pager - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default(); + let page_size = pager.page_size.get().expect("page size not set"); let default_cache_size = pager .io @@ -436,16 +433,67 @@ impl Database { self.open_flags.contains(OpenFlags::ReadOnly) } - fn init_pager(&self, page_size: Option) -> Result { + /// If we do not have a physical WAL file, but we know the database file is initialized on disk, + /// we need to read the page_size from the database header. + fn read_page_size_from_db_header(&self) -> Result { + turso_assert!( + self.db_state.is_initialized(), + "read_page_size_from_db_header called on uninitialized database" + ); + let buf = Arc::new(Buffer::new_temporary(PageSize::MIN as usize)); // this needs to be a block multiple for O_DIRECT + let c = Completion::new_read(buf.clone(), move |_buf, _| {}); + let c = self.db_file.read_header(c)?; + self.io.wait_for_completion(c)?; + let page_size = u16::from_be_bytes(buf.as_slice()[16..18].try_into().unwrap()); + Ok(PageSize::new(page_size as u32).unwrap_or_else(|| { + panic!("invalid page size in DB header: {page_size}"); + })) + } + + /// Read the page size in order of preference: + /// 1. From the WAL header if it exists and is initialized + /// 2. From the database header if the database is initialized + /// + /// Otherwise, fall back to, in order of preference: + /// 1. From the requested page size if it is provided + /// 2. PageSize::default(), i.e. 4096 + fn determine_actual_page_size( + &self, + maybe_shared_wal: Option<&WalFileShared>, + requested_page_size: Option, + ) -> Result { + if let Some(shared_wal) = maybe_shared_wal { + let size_in_wal = shared_wal.page_size(); + if size_in_wal != 0 { + return Ok(PageSize::new(size_in_wal).unwrap_or_else(|| { + panic!("invalid page size in WAL: {size_in_wal}"); + })); + } + } + if self.db_state.is_initialized() { + Ok(self.read_page_size_from_db_header()?) + } else { + let Some(size) = requested_page_size else { + return Ok(PageSize::default()); + }; + Ok(PageSize::new(size as u32).unwrap_or_else(|| { + panic!("invalid requested page size: {size}"); + })) + } + } + + fn init_pager(&self, requested_page_size: Option) -> Result { // Open existing WAL file if present let mut maybe_shared_wal = self.maybe_shared_wal.write(); if let Some(shared_wal) = maybe_shared_wal.clone() { - let size = match page_size { - None => unsafe { (*shared_wal.get()).page_size() as usize }, - Some(size) => size, - }; + let page_size = self.determine_actual_page_size( + Some(unsafe { &*shared_wal.get() }), + requested_page_size, + )?; let buffer_pool = self.buffer_pool.clone(); - buffer_pool.finalize_with_page_size(size)?; + if self.db_state.is_initialized() { + buffer_pool.finalize_with_page_size(page_size.get() as usize)?; + } let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( @@ -462,10 +510,17 @@ impl Database { db_state, self.init_lock.clone(), )?; + pager.page_size.set(Some(page_size)); return Ok(pager); } let buffer_pool = self.buffer_pool.clone(); + let page_size = self.determine_actual_page_size(None, requested_page_size)?; + + if self.db_state.is_initialized() { + buffer_pool.finalize_with_page_size(page_size.get() as usize)?; + } + // No existing WAL; create one. let db_state = self.db_state.clone(); let mut pager = Pager::new( @@ -478,22 +533,10 @@ impl Database { Arc::new(Mutex::new(())), )?; - let size = match page_size { - Some(size) => PageSize::new(size as u32).unwrap_or_else(|| { - panic!("invalid page size: {size}"); - }), - None => { - pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - } - }; - - pager.page_size.set(Some(size)); + pager.page_size.set(Some(page_size)); let wal_path = format!("{}-wal", self.path); let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; - let real_shared_wal = WalFileShared::new_shared(size.get(), &self.io, file)?; + let real_shared_wal = WalFileShared::new_shared(file)?; // Modify Database::maybe_shared_wal to point to the new WAL file so that other connections // can open the existing WAL. *maybe_shared_wal = Some(real_shared_wal.clone()); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0d4fe9414..214dfde9a 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8469,7 +8469,7 @@ mod tests { )); let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap(); - let wal_shared = WalFileShared::new_shared(page_size as u32, &io, wal_file).unwrap(); + let wal_shared = WalFileShared::new_shared(wal_file).unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), wal_shared, diff --git a/core/storage/database.rs b/core/storage/database.rs index 8e99bca56..ddcf39827 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -40,12 +40,6 @@ unsafe impl Sync for DatabaseFile {} impl DatabaseStorage for DatabaseFile { #[instrument(skip_all, level = Level::DEBUG)] fn read_header(&self, c: Completion) -> Result { - let r = c.as_read(); - let size = r.buf().len(); - assert!( - size == 100, - "the size of the database header must be 100 bytes, got {size}" - ); self.file.pread(0, c) } #[instrument(skip_all, level = Level::DEBUG)] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index d42174b65..988e07f6c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1147,7 +1147,11 @@ impl Pager { ); page }; - let c = wal.borrow_mut().append_frame(page.clone(), 0)?; + let c = wal.borrow_mut().append_frame( + page.clone(), + self.page_size.get().expect("page size not set"), + 0, + )?; // TODO: invalidade previous completions if this one fails completions.push(c); } @@ -1207,7 +1211,11 @@ impl Pager { }; // TODO: invalidade previous completions on error here - let c = wal.borrow_mut().append_frame(page.clone(), db_size)?; + let c = wal.borrow_mut().append_frame( + page.clone(), + self.page_size.get().expect("page size not set"), + db_size, + )?; completions.push(c); } self.dirty_pages.borrow_mut().clear(); @@ -2214,8 +2222,6 @@ mod ptrmap_tests { let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), WalFileShared::new_shared( - page_size, - &io, io.open_file("test.db-wal", OpenFlags::Create, false) .unwrap(), ) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b02744101..61a42543d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -16,7 +16,7 @@ use crate::io::{File, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, - write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, + write_pages_vectored, PageSize, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::{IOCompletions, IOResult}; use crate::{turso_assert, Buffer, LimboError, Result}; @@ -257,7 +257,12 @@ pub trait Wal { /// db_size > 0 -> last frame written in transaction /// db_size == 0 -> non-last frame written in transaction /// write_counter is the counter we use to track when the I/O operation starts and completes - fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result; + fn append_frame( + &mut self, + page: PageRef, + page_size: PageSize, + db_size: u32, + ) -> Result; /// Complete append of frames by updating shared wal state. Before this /// all changes were stored locally. @@ -951,9 +956,9 @@ impl Wal for WalFile { db_size: u64, page: &[u8], ) -> Result<()> { - if self.get_max_frame_in_wal() == 0 { - self.ensure_header_if_needed()?; - } + self.ensure_header_if_needed(PageSize::new(page.len() as u32).unwrap_or_else(|| { + panic!("invalid page size: {}", page.len()); + }))?; tracing::debug!("write_raw_frame({})", frame_id); if page.len() != self.page_size() as usize { return Err(LimboError::InvalidArgument(format!( @@ -1031,11 +1036,20 @@ impl Wal for WalFile { /// Write a frame to the WAL. #[instrument(skip_all, level = Level::DEBUG)] - fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result { + fn append_frame( + &mut self, + page: PageRef, + page_size: PageSize, + db_size: u32, + ) -> Result { + self.ensure_header_if_needed(page_size)?; let shared = self.get_shared(); - if shared.max_frame.load(Ordering::Acquire).eq(&0) { - self.ensure_header_if_needed()?; - } + let shared_page_size = shared.wal_header.lock().page_size; + turso_assert!( + shared_page_size == page_size.get(), + "page size mismatch - tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}", + page_size.get() + ); let page_id = page.get().id; let frame_id = self.max_frame + 1; let offset = self.frame_offset(frame_id); @@ -1283,20 +1297,26 @@ impl WalFile { /// the WAL file has been truncated and we are writing the first /// frame since then. We need to ensure that the header is initialized. - fn ensure_header_if_needed(&mut self) -> Result<()> { + fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> { + if self.get_shared().is_initialized()? { + return Ok(()); + } tracing::debug!("ensure_header_if_needed"); self.last_checksum = { let shared = self.get_shared(); - if shared.max_frame.load(Ordering::Acquire) != 0 { - return Ok(()); - } - if shared.file.size()? >= WAL_HEADER_SIZE as u64 { - return Ok(()); - } - let mut hdr = shared.wal_header.lock(); + hdr.magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; if hdr.page_size == 0 { - hdr.page_size = self.page_size(); + hdr.page_size = page_size.get(); + } + if hdr.salt_1 == 0 { + hdr.salt_1 = self.io.generate_random_number() as u32; + turso_assert!(hdr.salt_2 == 0, "salt_2 must be zero"); + hdr.salt_2 = self.io.generate_random_number() as u32; } // recompute header checksum @@ -1729,47 +1749,26 @@ impl WalFileShared { } } - pub fn new_shared( - page_size: u32, - io: &Arc, - file: Arc, - ) -> Result>> { + pub fn is_initialized(&self) -> Result { + Ok(self.file.size()? >= WAL_HEADER_SIZE as u64) + } + + pub fn new_shared(file: Arc) -> Result>> { let magic = if cfg!(target_endian = "big") { WAL_MAGIC_BE } else { WAL_MAGIC_LE }; - let mut wal_header = WalHeader { + let wal_header = WalHeader { magic, file_format: 3007000, - page_size, + page_size: 0, // Signifies WAL header that is not persistent on disk yet. checkpoint_seq: 0, // TODO implement sequence number - salt_1: io.generate_random_number() as u32, - salt_2: io.generate_random_number() as u32, + salt_1: 0, + salt_2: 0, checksum_1: 0, checksum_2: 0, }; - let native = cfg!(target_endian = "big"); // if target_endian is - // already big then we don't care but if isn't, header hasn't yet been - // encoded to big endian, therefore we want to swap bytes to compute this - // checksum. - let checksums = (0, 0); - let checksums = checksum_wal( - &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes - &wal_header, - checksums, - native, // this is false because we haven't encoded the wal header yet - ); - wal_header.checksum_1 = checksums.0; - wal_header.checksum_2 = checksums.1; - let c = sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; - let header = Arc::new(SpinLock::new(wal_header)); - let checksum = { - let checksum = header.lock(); - (checksum.checksum_1, checksum.checksum_2) - }; - io.wait_for_completion(c)?; - tracing::debug!("new_shared(header={:?})", header); let read_locks = array::from_fn(|_| TursoRwLock::new()); // slot zero is always zero as it signifies that reads can be done from the db file // directly, and slot 1 is the default read mark containing the max frame. in this case @@ -1785,7 +1784,7 @@ impl WalFileShared { max_frame: AtomicU64::new(0), nbackfills: AtomicU64::new(0), frame_cache: Arc::new(SpinLock::new(HashMap::new())), - last_checksum: checksum, + last_checksum: (0, 0), file, pages_in_frames: Arc::new(SpinLock::new(Vec::new())), read_locks, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 623b10d12..3217de181 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -6785,8 +6785,9 @@ pub fn op_open_ephemeral( let page_size = pager .io .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - .get() as usize; + .unwrap_or_default(); + + pager.page_size.set(Some(page_size)); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; }