diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bc0d92480..da5bae08c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -316,8 +316,16 @@ impl Page { #[derive(Clone, Copy, Debug)] /// The state of the current pager cache commit. enum CommitState { + /// Prepare WAL header for commit if needed + PrepareWal, + /// Sync WAL header after prepare + PrepareWalSync, + /// Get DB size (mostly from page cache - but in rare cases we can read it from disk) + GetDbSize, /// Appends all frames to the WAL. - Start, + PrepareFrames { + db_size: u32, + }, /// Fsync the on-disk WAL. SyncWal, /// Checkpoint the WAL to the database file (if needed). @@ -581,7 +589,7 @@ impl Pager { commit_info: CommitInfo { result: RefCell::new(None), completions: RefCell::new(Vec::new()), - state: CommitState::Start.into(), + state: CommitState::PrepareWal.into(), time: now.into(), }, syncing: Rc::new(Cell::new(false)), @@ -1258,6 +1266,14 @@ impl Pager { let mut completions: Vec = Vec::new(); let mut pages = Vec::with_capacity(len); let page_sz = self.page_size.get().unwrap_or_default(); + + let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?; + if let Some(c) = prepare { + self.io.wait_for_completion(c)?; + let c = wal.borrow_mut().prepare_wal_finish()?; + self.io.wait_for_completion(c)?; + } + let commit_frame = None; // cacheflush only so we are not setting a commit frame here for (idx, page_id) in dirty_pages.iter().enumerate() { let page = { @@ -1314,6 +1330,26 @@ impl Pager { wal_auto_checkpoint_disabled: bool, sync_mode: crate::SyncMode, data_sync_retry: bool, + ) -> Result> { + match self.commit_dirty_pages_inner( + wal_auto_checkpoint_disabled, + sync_mode, + data_sync_retry, + ) { + r @ (Ok(IOResult::Done(..)) | Err(..)) => { + self.commit_info.state.set(CommitState::PrepareWal); + r + } + Ok(IOResult::IO(io)) => Ok(IOResult::IO(io)), + } + } + + #[instrument(skip_all, level = Level::DEBUG)] + fn commit_dirty_pages_inner( + &self, + wal_auto_checkpoint_disabled: bool, + sync_mode: crate::SyncMode, + data_sync_retry: bool, ) -> Result> { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( @@ -1325,14 +1361,34 @@ impl Pager { let state = self.commit_info.state.get(); trace!(?state); match state { - CommitState::Start => { + CommitState::PrepareWal => { + let page_sz = self.page_size.get().expect("page size not set"); + let c = wal.borrow_mut().prepare_wal_start(page_sz)?; + let Some(c) = c else { + self.commit_info.state.set(CommitState::GetDbSize); + continue; + }; + self.commit_info.state.set(CommitState::PrepareWalSync); + if !c.is_completed() { + io_yield_one!(c); + } + } + CommitState::PrepareWalSync => { + let c = wal.borrow_mut().prepare_wal_finish()?; + self.commit_info.state.set(CommitState::GetDbSize); + if !c.is_completed() { + io_yield_one!(c); + } + } + CommitState::GetDbSize => { + let db_size = return_if_io!(self.with_header(|header| header.database_size)); + self.commit_info.state.set(CommitState::PrepareFrames { + db_size: db_size.get(), + }); + } + CommitState::PrepareFrames { db_size } => { let now = self.io.now(); self.commit_info.time.set(now); - let db_size_after = { - self.io - .block(|| self.with_header(|header| header.database_size))? - .get() - }; let dirty_ids: Vec = self.dirty_pages.read().iter().copied().collect(); if dirty_ids.is_empty() { @@ -1364,7 +1420,7 @@ impl Pager { if end_of_chunk { let commit_flag = if i == total - 1 { // Only the commit frame (final) frame carries the db_size - Some(db_size_after) + Some(db_size) } else { None }; @@ -1473,7 +1529,7 @@ impl Pager { let mut completions = self.commit_info.completions.borrow_mut(); if completions.iter().all(|c| c.is_completed()) { completions.clear(); - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareWal); wal.borrow_mut().finish_append_frames_commit()?; let result = self.commit_info.result.borrow_mut().take(); return Ok(IOResult::Done(result.expect("commit result should be set"))); @@ -1645,15 +1701,20 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result { + pub fn wal_checkpoint_start(&self, mode: CheckpointMode) -> Result> { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( "wal_checkpoint() called on database without WAL".to_string(), )); }; - let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?; + wal.borrow_mut().checkpoint(self, mode) + } + pub fn wal_checkpoint_finish( + &self, + checkpoint_result: &mut CheckpointResult, + ) -> Result> { 'ensure_sync: { if checkpoint_result.num_backfilled != 0 { if checkpoint_result.everything_backfilled() { @@ -1664,27 +1725,35 @@ impl Pager { let page_size = self.page_size.get().unwrap_or_default(); let expected = (db_size * page_size.get()) as u64; if expected < self.db_file.size()? { - self.io.wait_for_completion(self.db_file.truncate( - expected as usize, - Completion::new_trunc(move |_| { - tracing::trace!( - "Database file truncated to expected size: {} bytes", - expected - ); - }), - )?)?; - self.io - .wait_for_completion(self.db_file.sync(Completion::new_sync( - move |_| { - tracing::trace!("Database file syncd after truncation"); - }, - ))?)?; + if !checkpoint_result.db_truncate_sent { + let c = self.db_file.truncate( + expected as usize, + Completion::new_trunc(move |_| { + tracing::trace!( + "Database file truncated to expected size: {} bytes", + expected + ); + }), + )?; + checkpoint_result.db_truncate_sent = true; + io_yield_one!(c); + } + if !checkpoint_result.db_sync_sent { + let c = self.db_file.sync(Completion::new_sync(move |_| { + tracing::trace!("Database file syncd after truncation"); + }))?; + checkpoint_result.db_sync_sent = true; + io_yield_one!(c); + } break 'ensure_sync; } } - // if we backfilled at all, we have to sync the db-file here - self.io - .wait_for_completion(self.db_file.sync(Completion::new_sync(move |_| {}))?)?; + if !checkpoint_result.db_sync_sent { + // if we backfilled at all, we have to sync the db-file here + let c = self.db_file.sync(Completion::new_sync(move |_| {}))?; + checkpoint_result.db_sync_sent = true; + io_yield_one!(c); + } } } checkpoint_result.release_guard(); @@ -1693,7 +1762,14 @@ impl Pager { .write() .clear() .map_err(|e| LimboError::InternalError(format!("Failed to clear page cache: {e:?}")))?; - Ok(checkpoint_result) + Ok(IOResult::Done(())) + } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result { + let mut result = self.io.block(|| self.wal_checkpoint_start(mode))?; + self.io.block(|| self.wal_checkpoint_finish(&mut result))?; + Ok(result) } pub fn freepage_list(&self) -> u32 { @@ -2179,7 +2255,7 @@ impl Pager { fn reset_internal_states(&self) { *self.checkpoint_state.write() = CheckpointState::Checkpoint; self.syncing.replace(false); - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareWal); self.commit_info.time.set(self.io.now()); self.allocate_page_state.replace(AllocatePageState::Start); self.free_page_state.replace(FreePageState::Start); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 1617061cd..d2cb3e952 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -24,8 +24,8 @@ use crate::storage::sqlite3_ondisk::{ }; use crate::types::{IOCompletions, IOResult}; use crate::{ - bail_corrupt_error, io_yield_many, turso_assert, Buffer, Completion, CompletionError, - IOContext, LimboError, Result, + bail_corrupt_error, io_yield_many, io_yield_one, return_if_io, turso_assert, Buffer, + Completion, CompletionError, IOContext, LimboError, Result, }; #[derive(Debug, Clone, Default)] @@ -38,6 +38,8 @@ pub struct CheckpointResult { /// In the case of everything backfilled, we need to hold the locks until the db /// file is truncated. maybe_guard: Option, + pub db_truncate_sent: bool, + pub db_sync_sent: bool, } impl Drop for CheckpointResult { @@ -53,6 +55,8 @@ impl CheckpointResult { num_backfilled: n_ckpt, max_frame, maybe_guard: None, + db_sync_sent: false, + db_truncate_sent: false, } } @@ -265,18 +269,16 @@ pub trait Wal: Debug { page: &[u8], ) -> Result<()>; - /// Write a frame to the WAL. - /// db_size is the database size in pages after the transaction finishes. - /// db_size > 0 -> last frame written in transaction - /// db_size == 0 -> non-last frame written in transaction - /// write_counter is the counter we use to track when the I/O operation starts and completes - fn append_frame( - &mut self, - page: PageRef, - page_size: PageSize, - db_size: u32, - ) -> Result; + /// Prepare WAL header for the future append + /// Most of the time this method will return Ok(None) + fn prepare_wal_start(&mut self, page_sz: PageSize) -> Result>; + fn prepare_wal_finish(&mut self) -> Result; + + /// Write a bunch of frames to the WAL. + /// db_size is the database size in pages after the transaction finishes. + /// db_size is set -> last frame written in transaction + /// db_size is none -> non-last frame written in transaction fn append_frames_vectored( &mut self, pages: Vec, @@ -316,11 +318,16 @@ pub trait Wal: Debug { fn as_any(&self) -> &dyn std::any::Any; } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] pub enum CheckpointState { Start, Processing, - Done, + Finalize, + Truncate { + checkpoint_result: Option, + truncate_sent: bool, + sync_sent: bool, + }, } /// IOV_MAX is 1024 on most systems, lets use 512 to be safe @@ -1290,90 +1297,6 @@ impl Wal for WalFile { Ok(()) } - /// Write a frame to the WAL. - #[instrument(skip_all, level = Level::DEBUG)] - fn append_frame( - &mut self, - page: PageRef, - page_size: PageSize, - db_size: u32, - ) -> Result { - self.ensure_header_if_needed(page_size)?; - let shared_page_size = { - let shared = self.get_shared(); - let page_size = shared.wal_header.lock().page_size; - page_size - }; - turso_assert!( - shared_page_size == page_size.get(), - "page size mismatch - tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}", - page_size.get() - ); - let page_id = page.get().id; - let frame_id = self.max_frame + 1; - let offset = self.frame_offset(frame_id); - tracing::debug!(frame_id, offset, page_id); - let (c, checksums) = { - let shared = self.get_shared(); - let shared_file = self.shared.clone(); - let header = shared.wal_header.lock(); - let checksums = self.last_checksum; - let page_content = page.get_contents(); - let page_buf = page_content.as_ptr(); - - let io_ctx = self.io_ctx.borrow(); - let encrypted_data; - let data_to_write = match &io_ctx.encryption_or_checksum() { - EncryptionOrChecksum::Encryption(ctx) => { - encrypted_data = ctx.encrypt_page(page_buf, page_id)?; - encrypted_data.as_slice() - } - EncryptionOrChecksum::Checksum(ctx) => { - ctx.add_checksum_to_page(page_buf, page_id)?; - page_buf - } - EncryptionOrChecksum::None => page_buf, - }; - - let (frame_checksums, frame_bytes) = prepare_wal_frame( - &self.buffer_pool, - &header, - checksums, - header.page_size, - page_id as u32, - db_size, - data_to_write, - ); - - let c = Completion::new_write({ - let frame_bytes = frame_bytes.clone(); - move |res: Result| { - let Ok(bytes_written) = res else { - return; - }; - let frame_len = frame_bytes.len(); - turso_assert!( - bytes_written == frame_len as i32, - "wrote({bytes_written}) != expected({frame_len})" - ); - - page.clear_dirty(); - let seq = shared_file.read().epoch.load(Ordering::Acquire); - page.set_wal_tag(frame_id, seq); - } - }); - assert!( - shared.enabled.load(Ordering::Relaxed), - "WAL must be enabled" - ); - let file = shared.file.as_ref().unwrap(); - let result = file.pwrite(offset, frame_bytes.clone(), c)?; - (result, frame_checksums) - }; - self.complete_append_frame(page_id as u64, frame_id, checksums); - Ok(c) - } - #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); @@ -1487,6 +1410,65 @@ impl Wal for WalFile { Ok(pages) } + fn prepare_wal_start(&mut self, page_size: PageSize) -> Result> { + if self.get_shared().is_initialized()? { + return Ok(None); + } + tracing::debug!("ensure_header_if_needed"); + self.last_checksum = { + let mut shared = self.get_shared_mut(); + let checksum = { + let mut hdr = shared.wal_header.lock(); + hdr.magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; + if hdr.page_size == 0 { + hdr.page_size = page_size.get(); + } + if hdr.salt_1 == 0 && hdr.salt_2 == 0 { + hdr.salt_1 = self.io.generate_random_number() as u32; + hdr.salt_2 = self.io.generate_random_number() as u32; + } + + // recompute header checksum + let prefix = &hdr.as_bytes()[..WAL_HEADER_SIZE - 8]; + let use_native = (hdr.magic & 1) != 0; + let (c1, c2) = checksum_wal(prefix, &hdr, (0, 0), use_native); + hdr.checksum_1 = c1; + hdr.checksum_2 = c2; + (c1, c2) + }; + shared.last_checksum = checksum; + checksum + }; + + self.max_frame = 0; + let shared = self.get_shared(); + assert!( + shared.enabled.load(Ordering::Relaxed), + "WAL must be enabled" + ); + let file = shared.file.as_ref().unwrap(); + let c = sqlite3_ondisk::begin_write_wal_header(file, &shared.wal_header.lock())?; + Ok(Some(c)) + } + + fn prepare_wal_finish(&mut self) -> Result { + let shared = self.get_shared(); + assert!( + shared.enabled.load(Ordering::Relaxed), + "WAL must be enabled" + ); + let file = shared.file.as_ref().unwrap(); + let shared = self.shared.clone(); + let c = file.sync(Completion::new_sync(move |_| { + shared.read().initialized.store(true, Ordering::Release); + }))?; + Ok(c) + } + /// Use pwritev to append many frames to the log at once fn append_frames_vectored( &mut self, @@ -1498,7 +1480,10 @@ impl Wal for WalFile { pages.len() <= IOV_MAX, "we limit number of iovecs to IOV_MAX" ); - self.ensure_header_if_needed(page_sz)?; + turso_assert!( + self.get_shared().is_initialized()?, + "WAL must be prepared with prepare_wal_start/prepare_wal_finish method" + ); let (header, shared_page_size, epoch) = { let shared = self.get_shared(); @@ -1711,54 +1696,12 @@ impl WalFile { /// the WAL file has been truncated and we are writing the first /// frame since then. We need to ensure that the header is initialized. fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> { - if self.get_shared().is_initialized()? { + let Some(c) = self.prepare_wal_start(page_size)? else { return Ok(()); - } - tracing::debug!("ensure_header_if_needed"); - self.last_checksum = { - let mut shared = self.get_shared_mut(); - let checksum = { - let mut hdr = shared.wal_header.lock(); - hdr.magic = if cfg!(target_endian = "big") { - WAL_MAGIC_BE - } else { - WAL_MAGIC_LE - }; - if hdr.page_size == 0 { - hdr.page_size = page_size.get(); - } - if hdr.salt_1 == 0 && hdr.salt_2 == 0 { - hdr.salt_1 = self.io.generate_random_number() as u32; - hdr.salt_2 = self.io.generate_random_number() as u32; - } - - // recompute header checksum - let prefix = &hdr.as_bytes()[..WAL_HEADER_SIZE - 8]; - let use_native = (hdr.magic & 1) != 0; - let (c1, c2) = checksum_wal(prefix, &hdr, (0, 0), use_native); - hdr.checksum_1 = c1; - hdr.checksum_2 = c2; - (c1, c2) - }; - shared.last_checksum = checksum; - checksum }; - - self.max_frame = 0; - let shared = self.get_shared(); - assert!( - shared.enabled.load(Ordering::Relaxed), - "WAL must be enabled" - ); - let file = shared.file.as_ref().unwrap(); - self.io - .wait_for_completion(sqlite3_ondisk::begin_write_wal_header( - file, - &shared.wal_header.lock(), - )?)?; - self.io - .wait_for_completion(file.sync(Completion::new_sync(|_| {}))?)?; - shared.initialized.store(true, Ordering::Release); + self.io.wait_for_completion(c)?; + let c = self.prepare_wal_finish()?; + self.io.wait_for_completion(c)?; Ok(()) } @@ -1768,7 +1711,7 @@ impl WalFile { mode: CheckpointMode, ) -> Result> { loop { - let state = self.ongoing_checkpoint.state; + let state = &mut self.ongoing_checkpoint.state; tracing::debug!(?state); match state { // Acquire the relevant exclusive locks and checkpoint_lock @@ -1790,6 +1733,8 @@ impl WalFile { num_backfilled: self.prev_checkpoint.num_backfilled, max_frame: nbackfills, maybe_guard: None, + db_sync_sent: false, + db_truncate_sent: false, })); } // acquire the appropriate exclusive locks depending on the checkpoint mode @@ -1942,19 +1887,19 @@ impl WalFile { if !completions.is_empty() { io_yield_many!(completions); } else if self.ongoing_checkpoint.complete() { - self.ongoing_checkpoint.state = CheckpointState::Done; + self.ongoing_checkpoint.state = CheckpointState::Finalize; } } // All eligible frames copied to the db file // Update nBackfills // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled - CheckpointState::Done => { + CheckpointState::Finalize => { turso_assert!( self.ongoing_checkpoint.complete(), "checkpoint pending flush must have finished" ); - let mut checkpoint_result = { + let checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::Acquire); let nbackfills = shared.nbackfills.load(Ordering::Acquire); @@ -2002,6 +1947,28 @@ impl WalFile { if mode.should_restart_log() { self.restart_log(mode)?; } + self.ongoing_checkpoint.state = CheckpointState::Truncate { + checkpoint_result: Some(checkpoint_result), + truncate_sent: false, + sync_sent: false, + }; + } + CheckpointState::Truncate { .. } => { + if matches!(mode, CheckpointMode::Truncate { .. }) { + return_if_io!(self.truncate_log()); + } + if mode.should_restart_log() { + Self::unlock_after_restart(&self.shared, None); + } + let mut checkpoint_result = { + let CheckpointState::Truncate { + checkpoint_result, .. + } = &mut self.ongoing_checkpoint.state + else { + panic!("unxpected state"); + }; + checkpoint_result.take().unwrap() + }; // increment wal epoch to ensure no stale pages are used for backfilling self.get_shared().epoch.fetch_add(1, Ordering::Release); @@ -2128,62 +2095,89 @@ impl WalFile { } } - let unlock = |e: Option<&LimboError>| { - // release all read locks we just acquired, the caller will take care of the others - let shared = self.shared.write(); - for idx in 1..shared.read_locks.len() { - shared.read_locks[idx].unlock(); - } - if let Some(e) = e { - tracing::error!( - "Failed to restart WAL header: {:?}, releasing read locks", - e - ); - } - }; // reinitialize in‑memory state - self.get_shared_mut() - .restart_wal_header(&self.io, mode) - .inspect_err(|e| { - unlock(Some(e)); - })?; + self.get_shared_mut().restart_wal_header(&self.io, mode); let cksm = self.get_shared().last_checksum; self.last_checksum = cksm; self.max_frame = 0; self.min_frame = 0; self.checkpoint_seq.fetch_add(1, Ordering::Release); + Ok(()) + } - // For TRUNCATE mode: shrink the WAL file to 0 B - if matches!(mode, CheckpointMode::Truncate { .. }) { - let c = Completion::new_trunc(|_| { - tracing::trace!("WAL file truncated to 0 B"); - }); + fn truncate_log(&mut self) -> Result> { + let file = { let shared = self.get_shared(); - // for now at least, lets do all this IO syncronously assert!( shared.enabled.load(Ordering::Relaxed), "WAL must be enabled" ); - let file = shared.file.as_ref().unwrap(); - let c = file.truncate(0, c).inspect_err(|e| unlock(Some(e)))?; shared.initialized.store(false, Ordering::Release); - self.io - .wait_for_completion(c) - .inspect_err(|e| unlock(Some(e)))?; - // fsync after truncation - self.io - .wait_for_completion( - file.sync(Completion::new_sync(|_| { - tracing::trace!("WAL file synced after reset/truncation"); - })) - .inspect_err(|e| unlock(Some(e)))?, - ) - .inspect_err(|e| unlock(Some(e)))?; - } + shared.file.as_ref().unwrap().clone() + }; - // release read‑locks 1..4 - unlock(None); - Ok(()) + let CheckpointState::Truncate { + sync_sent, + truncate_sent, + .. + } = &mut self.ongoing_checkpoint.state + else { + panic!("unxpected state"); + }; + if !*truncate_sent { + // For TRUNCATE mode: shrink the WAL file to 0 B + + let c = Completion::new_trunc({ + let shared = self.shared.clone(); + move |result| { + if let Err(err) = result { + Self::unlock_after_restart( + &shared, + Some(&LimboError::InternalError(err.to_string())), + ); + } else { + tracing::trace!("WAL file truncated to 0 B"); + } + } + }); + let c = file + .truncate(0, c) + .inspect_err(|e| Self::unlock_after_restart(&self.shared, Some(e)))?; + *truncate_sent = true; + io_yield_one!(c); + } else if !*sync_sent { + let shared = self.shared.clone(); + let c = file + .sync(Completion::new_sync(move |result| { + if let Err(err) = result { + Self::unlock_after_restart( + &shared, + Some(&LimboError::InternalError(err.to_string())), + ); + } else { + tracing::trace!("WAL file synced after reset/truncation"); + } + })) + .inspect_err(|e| Self::unlock_after_restart(&self.shared, Some(e)))?; + *sync_sent = true; + io_yield_one!(c); + } + Ok(IOResult::Done(())) + } + + // unlock shared read locks taken by RESTART/TRUNCATE checkpoint modes + fn unlock_after_restart(shared: &Arc>, e: Option<&LimboError>) { + // release all read locks we just acquired, the caller will take care of the others + let shared = shared.write(); + for idx in 1..shared.read_locks.len() { + shared.read_locks[idx].unlock(); + } + if let Some(e) = e { + tracing::error!( + "Failed to restart WAL header: {:?}, releasing read locks", + e + ); + } } fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> { @@ -2401,7 +2395,7 @@ impl WalFileShared { /// This function updates the shared-memory structures so that the next /// client to write to the database (which may be this one) does so by /// writing frames into the start of the log file. - fn restart_wal_header(&mut self, io: &Arc, mode: CheckpointMode) -> Result<()> { + fn restart_wal_header(&mut self, io: &Arc, mode: CheckpointMode) { turso_assert!( matches!( mode, @@ -2428,7 +2422,6 @@ impl WalFileShared { for lock in &self.read_locks[2..] { lock.set_value_exclusive(READMARK_NOT_USED); } - Ok(()) } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 366e8be56..82025f40e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -8,13 +8,13 @@ use crate::storage::btree::{ use crate::storage::database::DatabaseFile; use crate::storage::page_cache::PageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; -use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader}; +use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader, PageSize}; use crate::translate::collate::CollationSeq; use crate::types::{ compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord, SeekResult, Text, }; -use crate::util::{normalize_ident, IOExt as _}; +use crate::util::normalize_ident; use crate::vdbe::insn::InsertFlags; use crate::vdbe::registers_to_ref_values; use crate::vector::{vector_concat, vector_slice}; @@ -330,12 +330,35 @@ pub fn op_bit_not( Ok(InsnFunctionStepResult::Step) } +#[derive(Debug)] +pub enum OpCheckpointState { + StartCheckpoint, + FinishCheckpoint { result: Option }, + CompleteResult { result: Result }, +} + pub fn op_checkpoint( program: &Program, state: &mut ProgramState, insn: &Insn, pager: &Arc, mv_store: Option<&Arc>, +) -> Result { + match op_checkpoint_inner(program, state, insn, pager, mv_store) { + Ok(result) => Ok(result), + Err(err) => { + state.op_checkpoint_state = OpCheckpointState::StartCheckpoint; + Err(err) + } + } +} + +pub fn op_checkpoint_inner( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Arc, + mv_store: Option<&Arc>, ) -> Result { load_insn!( Checkpoint { @@ -352,26 +375,75 @@ pub fn op_checkpoint( // however. return Err(LimboError::TableLocked); } - let result = program.connection.checkpoint(*checkpoint_mode); - match result { - Ok(CheckpointResult { - num_attempted, - num_backfilled, - .. - }) => { - // https://sqlite.org/pragma.html#pragma_wal_checkpoint - // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). - state.registers[*dest] = Register::Value(Value::Integer(0)); - // 2nd col: # modified pages written to wal file - state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64)); - // 3rd col: # pages moved to db after checkpoint - state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64)); - } - Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), - } + loop { + match &mut state.op_checkpoint_state { + OpCheckpointState::StartCheckpoint => { + let step_result = program + .connection + .pager + .borrow_mut() + .wal_checkpoint_start(*checkpoint_mode); + match step_result { + Ok(IOResult::Done(result)) => { + state.op_checkpoint_state = OpCheckpointState::FinishCheckpoint { + result: Some(result), + }; + continue; + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + Err(err) => { + state.op_checkpoint_state = + OpCheckpointState::CompleteResult { result: Err(err) }; + continue; + } + } + } + OpCheckpointState::FinishCheckpoint { result } => { + let step_result = program + .connection + .pager + .borrow_mut() + .wal_checkpoint_finish(result.as_mut().unwrap()); + match step_result { + Ok(IOResult::Done(())) => { + state.op_checkpoint_state = OpCheckpointState::CompleteResult { + result: Ok(result.take().unwrap()), + }; + continue; + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + Err(err) => { + state.op_checkpoint_state = + OpCheckpointState::CompleteResult { result: Err(err) }; + continue; + } + } + } + OpCheckpointState::CompleteResult { result } => { + match result { + Ok(CheckpointResult { + num_attempted, + num_backfilled, + .. + }) => { + // https://sqlite.org/pragma.html#pragma_wal_checkpoint + // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). + state.registers[*dest] = Register::Value(Value::Integer(0)); + // 2nd col: # modified pages written to wal file + state.registers[*dest + 1] = + Register::Value(Value::Integer(*num_attempted as i64)); + // 3rd col: # pages moved to db after checkpoint + state.registers[*dest + 2] = + Register::Value(Value::Integer(*num_backfilled as i64)); + } + Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), + } - state.pc += 1; - Ok(InsnFunctionStepResult::Step) + state.pc += 1; + return Ok(InsnFunctionStepResult::Step); + } + } + } } pub fn op_null( @@ -2080,7 +2152,29 @@ pub fn op_halt_if_null( } } +#[derive(Debug, Clone, Copy)] +pub enum OpTransactionState { + Start, + CheckSchemaCookie, +} + pub fn op_transaction( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Arc, + mv_store: Option<&Arc>, +) -> Result { + match op_transaction_inner(program, state, insn, pager, mv_store) { + Ok(result) => Ok(result), + Err(err) => { + state.op_transaction_state = OpTransactionState::Start; + Err(err) + } + } +} + +pub fn op_transaction_inner( program: &Program, state: &mut ProgramState, insn: &Insn, @@ -2095,176 +2189,188 @@ pub fn op_transaction( }, insn ); - let conn = program.connection.clone(); - let write = matches!(tx_mode, TransactionMode::Write); - if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { - return Err(LimboError::ReadOnly); - } let pager = program.get_pager_from_database_index(db); - - // 1. We try to upgrade current version - let current_state = conn.transaction_state.get(); - let (new_transaction_state, updated) = if conn.is_nested_stmt.get() { - (current_state, false) - } else { - match (current_state, write) { - // pending state means that we tried beginning a tx and the method returned IO. - // instead of ending the read tx, just update the state to pending. - (TransactionState::PendingUpgrade, write) => { - turso_assert!( - write, - "pending upgrade should only be set for write transactions" - ); - ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ) - } - (TransactionState::Write { schema_did_change }, true) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Write { schema_did_change }, false) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Read, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::None, false) => (TransactionState::Read, true), - } - }; - - // 2. Start transaction if needed - if let Some(mv_store) = &mv_store { - // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. - // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough - // for both. - if program.connection.mv_tx.get().is_none() { - // We allocate the first page lazily in the first transaction. - // TODO: when we fix MVCC enable schema cookie detection for reprepare statements - // let header_schema_cookie = pager - // .io - // .block(|| pager.with_header(|header| header.schema_cookie.get()))?; - // if header_schema_cookie != *schema_cookie { - // return Err(LimboError::SchemaUpdated); - // } - let tx_id = match tx_mode { - TransactionMode::None | TransactionMode::Read | TransactionMode::Concurrent => { - mv_store.begin_tx(pager.clone())? + loop { + match state.op_transaction_state { + OpTransactionState::Start => { + let conn = program.connection.clone(); + let write = matches!(tx_mode, TransactionMode::Write); + if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { + return Err(LimboError::ReadOnly); } - TransactionMode::Write => { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) - } - }; - program.connection.mv_tx.set(Some((tx_id, *tx_mode))); - } else if updated { - // TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example - let mv_tx_mode = program.connection.mv_tx.get().unwrap().1; - let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent { - TransactionMode::Concurrent - } else { - *tx_mode - }; - if matches!(new_transaction_state, TransactionState::Write { .. }) - && matches!(actual_tx_mode, TransactionMode::Write) - { - let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap(); - if mv_tx_mode == TransactionMode::Read { - return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))); + + // 1. We try to upgrade current version + let current_state = conn.transaction_state.get(); + let (new_transaction_state, updated) = if conn.is_nested_stmt.get() { + (current_state, false) } else { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + match (current_state, write) { + // pending state means that we tried beginning a tx and the method returned IO. + // instead of ending the read tx, just update the state to pending. + (TransactionState::PendingUpgrade, write) => { + turso_assert!( + write, + "pending upgrade should only be set for write transactions" + ); + ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ) + } + (TransactionState::Write { schema_did_change }, true) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Write { schema_did_change }, false) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Read, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::None, false) => (TransactionState::Read, true), + } + }; + + // 2. Start transaction if needed + if let Some(mv_store) = &mv_store { + // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. + // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough + // for both. + if program.connection.mv_tx.get().is_none() { + // We allocate the first page lazily in the first transaction. + // TODO: when we fix MVCC enable schema cookie detection for reprepare statements + // let header_schema_cookie = pager + // .io + // .block(|| pager.with_header(|header| header.schema_cookie.get()))?; + // if header_schema_cookie != *schema_cookie { + // return Err(LimboError::SchemaUpdated); + // } + let tx_id = match tx_mode { + TransactionMode::None + | TransactionMode::Read + | TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?, + TransactionMode::Write => { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) + } + }; + program.connection.mv_tx.set(Some((tx_id, *tx_mode))); + } else if updated { + // TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example + let mv_tx_mode = program.connection.mv_tx.get().unwrap().1; + let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent { + TransactionMode::Concurrent + } else { + *tx_mode + }; + if matches!(new_transaction_state, TransactionState::Write { .. }) + && matches!(actual_tx_mode, TransactionMode::Write) + { + let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap(); + if mv_tx_mode == TransactionMode::Read { + return_if_io!( + mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)) + ); + } else { + return_if_io!( + mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)) + ); + } + } + } + } else { + if matches!(tx_mode, TransactionMode::Concurrent) { + return Err(LimboError::TxError( + "Concurrent transaction mode is only supported when MVCC is enabled" + .to_string(), + )); + } + if updated && matches!(current_state, TransactionState::None) { + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new read transaction" + ); + pager.begin_read_tx()?; + } + + if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new write transaction" + ); + let begin_w_tx_res = pager.begin_write_tx(); + if let Err(LimboError::Busy) = begin_w_tx_res { + // We failed to upgrade to write transaction so put the transaction into its original state. + // That is, if the transaction had not started, end the read transaction so that next time we + // start a new one. + if matches!(current_state, TransactionState::None) { + pager.end_read_tx()?; + conn.transaction_state.replace(TransactionState::None); + } + assert_eq!(conn.transaction_state.get(), current_state); + return Err(LimboError::Busy); + } + if let IOResult::IO(io) = begin_w_tx_res? { + // set the transaction state to pending so we don't have to + // end the read transaction. + program + .connection + .transaction_state + .replace(TransactionState::PendingUpgrade); + return Ok(InsnFunctionStepResult::IO(io)); + } + } } - } - } - } else { - if matches!(tx_mode, TransactionMode::Concurrent) { - return Err(LimboError::TxError( - "Concurrent transaction mode is only supported when MVCC is enabled".to_string(), - )); - } - if updated && matches!(current_state, TransactionState::None) { - turso_assert!( - !conn.is_nested_stmt.get(), - "nested stmt should not begin a new read transaction" - ); - pager.begin_read_tx()?; - } - if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { - turso_assert!( - !conn.is_nested_stmt.get(), - "nested stmt should not begin a new write transaction" - ); - let begin_w_tx_res = pager.begin_write_tx(); - if let Err(LimboError::Busy) = begin_w_tx_res { - // We failed to upgrade to write transaction so put the transaction into its original state. - // That is, if the transaction had not started, end the read transaction so that next time we - // start a new one. - if matches!(current_state, TransactionState::None) { - pager.end_read_tx()?; - conn.transaction_state.replace(TransactionState::None); + // 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error + if updated { + conn.transaction_state.replace(new_transaction_state); } - assert_eq!(conn.transaction_state.get(), current_state); - return Err(LimboError::Busy); + state.op_transaction_state = OpTransactionState::CheckSchemaCookie; + continue; } - if let IOResult::IO(io) = begin_w_tx_res? { - // set the transaction state to pending so we don't have to - // end the read transaction. - program - .connection - .transaction_state - .replace(TransactionState::PendingUpgrade); - return Ok(InsnFunctionStepResult::IO(io)); + // 4. Check whether schema has changed if we are actually going to access the database. + // Can only read header if page 1 has been allocated already + // begin_write_tx that happens, but not begin_read_tx + OpTransactionState::CheckSchemaCookie => { + let res = with_header(&pager, mv_store, program, |header| { + header.schema_cookie.get() + }); + match res { + Ok(IOResult::Done(header_schema_cookie)) => { + if header_schema_cookie != *schema_cookie { + tracing::debug!( + "schema changed, force reprepare: {} != {}", + header_schema_cookie, + *schema_cookie + ); + return Err(LimboError::SchemaUpdated); + } + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution + Err(LimboError::Page1NotAlloc) => {} + Err(err) => { + return Err(err); + } + } + + state.pc += 1; + return Ok(InsnFunctionStepResult::Step); } } } - - // 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error - if updated { - conn.transaction_state.replace(new_transaction_state); - } - - // 4. Check whether schema has changed if we are actually going to access the database. - // Can only read header if page 1 has been allocated already - // begin_write_tx that happens, but not begin_read_tx - // TODO: this is a hack to make the pager run the IO loop - let res = pager.io.block(|| { - with_header(&pager, mv_store, program, |header| { - header.schema_cookie.get() - }) - }); - match res { - Ok(header_schema_cookie) => { - if header_schema_cookie != *schema_cookie { - tracing::debug!( - "schema changed, force reprepare: {} != {}", - header_schema_cookie, - *schema_cookie - ); - return Err(LimboError::SchemaUpdated); - } - } - // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution - Err(LimboError::Page1NotAlloc) => {} - Err(err) => { - return Err(err); - } - } - - state.pc += 1; - Ok(InsnFunctionStepResult::Step) } pub fn op_auto_commit( @@ -3891,14 +3997,17 @@ pub fn op_sorter_open( }, insn ); - let cache_size = program.connection.get_cache_size(); - // Set the buffer size threshold to be roughly the same as the limit configured for the page-cache. - let page_size = pager - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - .get() as usize; + // be careful here - we must not use any async operations after pager.with_header because this op-code has no proper state-machine + let page_size = match pager.with_header(|header| header.page_size) { + Ok(IOResult::Done(page_size)) => page_size, + Err(_) => PageSize::default(), + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + }; + let page_size = page_size.get() as usize; + let cache_size = program.connection.get_cache_size(); + + // Set the buffer size threshold to be roughly the same as the limit configured for the page-cache. let max_buffer_size_bytes = if cache_size < 0 { (cache_size.abs() * 1024) as usize } else { @@ -7076,6 +7185,8 @@ pub fn op_open_ephemeral( match &mut state.op_open_ephemeral_state { OpOpenEphemeralState::Start => { tracing::trace!("Start"); + let page_size = + return_if_io!(with_header(pager, mv_store, program, |header| header.page_size)); let conn = program.connection.clone(); let io = conn.pager.borrow().io.clone(); let rand_num = io.generate_random_number(); @@ -7108,11 +7219,6 @@ pub fn op_open_ephemeral( db_file_io = io; } - let page_size = pager - .io - .block(|| with_header(pager, mv_store, program, |header| header.page_size))? - .get(); - let buffer_pool = program.connection._db.buffer_pool.clone(); let page_cache = Arc::new(RwLock::new(PageCache::default())); @@ -7126,11 +7232,6 @@ pub fn op_open_ephemeral( Arc::new(Mutex::new(())), )?); - let page_size = pager - .io - .block(|| with_header(&pager, mv_store, program, |header| header.page_size)) - .unwrap_or_default(); - pager.page_size.set(Some(page_size)); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 1ab249b03..0b4fd6c9c 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -35,8 +35,9 @@ use crate::{ types::{IOCompletions, IOResult, RawSlice, TextRef}, vdbe::{ execute::{ - OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, - OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, + OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, + OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, + OpSeekState, OpTransactionState, }, metrics::StatementMetrics, }, @@ -290,6 +291,8 @@ pub struct ProgramState { current_collation: Option, op_column_state: OpColumnState, op_row_id_state: OpRowIdState, + op_transaction_state: OpTransactionState, + op_checkpoint_state: OpCheckpointState, /// State machine for committing view deltas with I/O handling view_delta_state: ViewDeltaCommitState, } @@ -333,6 +336,8 @@ impl ProgramState { current_collation: None, op_column_state: OpColumnState::Start, op_row_id_state: OpRowIdState::Start, + op_transaction_state: OpTransactionState::Start, + op_checkpoint_state: OpCheckpointState::StartCheckpoint, view_delta_state: ViewDeltaCommitState::NotStarted, } } diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index 0a3b6366e..6054c7a32 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -16,10 +16,16 @@ fn test_schema_reprepare() { let mut stmt = conn2.prepare("SELECT y, z FROM t").unwrap(); let mut stmt2 = conn2.prepare("SELECT x, z FROM t").unwrap(); conn1.execute("ALTER TABLE t DROP COLUMN x").unwrap(); - assert_eq!( - stmt2.step().unwrap_err().to_string(), - "Parse error: no such column: x" - ); + loop { + match stmt2.step() { + Ok(StepResult::IO) => tmp_db.io.step().unwrap(), + Err(err) => { + assert_eq!(err.to_string(), "Parse error: no such column: x"); + break; + } + r => panic!("unexpected response: {r:?}"), + } + } let mut rows = Vec::new(); loop {