From 9cc1d0fcc2d93415e49b9edc6c9a29943745f0ee Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 14:40:13 +0400 Subject: [PATCH 01/17] make append_frames fully async and non-blocking --- core/storage/pager.rs | 32 +++++++++- core/storage/wal.rs | 142 +++++++++++------------------------------- 2 files changed, 66 insertions(+), 108 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bc0d92480..a099e438c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -316,6 +316,10 @@ impl Page { #[derive(Clone, Copy, Debug)] /// The state of the current pager cache commit. enum CommitState { + /// Prepare WAL header for commit if needed + PrepareWal, + /// Sync WAL header after prepare + PrepareWalSync, /// Appends all frames to the WAL. Start, /// Fsync the on-disk WAL. @@ -581,7 +585,7 @@ impl Pager { commit_info: CommitInfo { result: RefCell::new(None), completions: RefCell::new(Vec::new()), - state: CommitState::Start.into(), + state: CommitState::PrepareWal.into(), time: now.into(), }, syncing: Rc::new(Cell::new(false)), @@ -1258,6 +1262,13 @@ impl Pager { let mut completions: Vec = Vec::new(); let mut pages = Vec::with_capacity(len); let page_sz = self.page_size.get().unwrap_or_default(); + + if let Some(c) = wal.borrow_mut().prepare_wal_start(page_sz)? { + self.io.wait_for_completion(c)?; + let c = wal.borrow_mut().prepare_wal_finish()?; + self.io.wait_for_completion(c)?; + } + let commit_frame = None; // cacheflush only so we are not setting a commit frame here for (idx, page_id) in dirty_pages.iter().enumerate() { let page = { @@ -1325,6 +1336,25 @@ impl Pager { let state = self.commit_info.state.get(); trace!(?state); match state { + CommitState::PrepareWal => { + let page_sz = self.page_size.get().expect("page size not set"); + let c = wal.borrow_mut().prepare_wal_start(page_sz)?; + let Some(c) = c else { + self.commit_info.state.set(CommitState::Start); + continue; + }; + self.commit_info.state.set(CommitState::PrepareWalSync); + if !c.is_completed() { + io_yield_one!(c); + } + } + CommitState::PrepareWalSync => { + let c = wal.borrow_mut().prepare_wal_finish()?; + self.commit_info.state.set(CommitState::Start); + if !c.is_completed() { + io_yield_one!(c); + } + } CommitState::Start => { let now = self.io.now(); self.commit_info.time.set(now); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index aeddca4d5..be77c7890 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -265,18 +265,16 @@ pub trait Wal: Debug { page: &[u8], ) -> Result<()>; - /// Write a frame to the WAL. - /// db_size is the database size in pages after the transaction finishes. - /// db_size > 0 -> last frame written in transaction - /// db_size == 0 -> non-last frame written in transaction - /// write_counter is the counter we use to track when the I/O operation starts and completes - fn append_frame( - &mut self, - page: PageRef, - page_size: PageSize, - db_size: u32, - ) -> Result; + /// Prepare WAL header for the future append + /// Most of the time this method will return Ok(None) + fn prepare_wal_start(&mut self, page_sz: PageSize) -> Result>; + fn prepare_wal_finish(&mut self) -> Result; + + /// Write a bunch of frames to the WAL. + /// db_size is the database size in pages after the transaction finishes. + /// db_size is set -> last frame written in transaction + /// db_size is none -> non-last frame written in transaction fn append_frames_vectored( &mut self, pages: Vec, @@ -1288,90 +1286,6 @@ impl Wal for WalFile { Ok(()) } - /// Write a frame to the WAL. - #[instrument(skip_all, level = Level::DEBUG)] - fn append_frame( - &mut self, - page: PageRef, - page_size: PageSize, - db_size: u32, - ) -> Result { - self.ensure_header_if_needed(page_size)?; - let shared_page_size = { - let shared = self.get_shared(); - let page_size = shared.wal_header.lock().page_size; - page_size - }; - turso_assert!( - shared_page_size == page_size.get(), - "page size mismatch - tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}", - page_size.get() - ); - let page_id = page.get().id; - let frame_id = self.max_frame + 1; - let offset = self.frame_offset(frame_id); - tracing::debug!(frame_id, offset, page_id); - let (c, checksums) = { - let shared = self.get_shared(); - let shared_file = self.shared.clone(); - let header = shared.wal_header.lock(); - let checksums = self.last_checksum; - let page_content = page.get_contents(); - let page_buf = page_content.as_ptr(); - - let io_ctx = self.io_ctx.borrow(); - let encrypted_data; - let data_to_write = match &io_ctx.encryption_or_checksum() { - EncryptionOrChecksum::Encryption(ctx) => { - encrypted_data = ctx.encrypt_page(page_buf, page_id)?; - encrypted_data.as_slice() - } - EncryptionOrChecksum::Checksum(ctx) => { - ctx.add_checksum_to_page(page_buf, page_id)?; - page_buf - } - EncryptionOrChecksum::None => page_buf, - }; - - let (frame_checksums, frame_bytes) = prepare_wal_frame( - &self.buffer_pool, - &header, - checksums, - header.page_size, - page_id as u32, - db_size, - data_to_write, - ); - - let c = Completion::new_write({ - let frame_bytes = frame_bytes.clone(); - move |res: Result| { - let Ok(bytes_written) = res else { - return; - }; - let frame_len = frame_bytes.len(); - turso_assert!( - bytes_written == frame_len as i32, - "wrote({bytes_written}) != expected({frame_len})" - ); - - page.clear_dirty(); - let seq = shared_file.read().epoch.load(Ordering::Acquire); - page.set_wal_tag(frame_id, seq); - } - }); - assert!( - shared.enabled.load(Ordering::Relaxed), - "WAL must be enabled" - ); - let file = shared.file.as_ref().unwrap(); - let result = file.pwrite(offset, frame_bytes.clone(), c)?; - (result, frame_checksums) - }; - self.complete_append_frame(page_id as u64, frame_id, checksums); - Ok(c) - } - #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); @@ -1485,6 +1399,24 @@ impl Wal for WalFile { Ok(pages) } + fn prepare_wal_start(&mut self, page_sz: PageSize) -> Result> { + self.ensure_header_if_needed(page_sz) + } + + fn prepare_wal_finish(&mut self) -> Result { + let shared = self.get_shared(); + assert!( + shared.enabled.load(Ordering::Relaxed), + "WAL must be enabled" + ); + let file = shared.file.as_ref().unwrap(); + let shared = self.shared.clone(); + let c = file.sync(Completion::new_sync(move |_| { + shared.read().initialized.store(true, Ordering::Release); + }))?; + Ok(c) + } + /// Use pwritev to append many frames to the log at once fn append_frames_vectored( &mut self, @@ -1496,7 +1428,10 @@ impl Wal for WalFile { pages.len() <= IOV_MAX, "we limit number of iovecs to IOV_MAX" ); - self.ensure_header_if_needed(page_sz)?; + turso_assert!( + self.get_shared().is_initialized()?, + "WAL must be prepared with prepare_append method" + ); let (header, shared_page_size, epoch) = { let shared = self.get_shared(); @@ -1708,9 +1643,9 @@ impl WalFile { /// the WAL file has been truncated and we are writing the first /// frame since then. We need to ensure that the header is initialized. - fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> { + fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result> { if self.get_shared().is_initialized()? { - return Ok(()); + return Ok(None); } tracing::debug!("ensure_header_if_needed"); self.last_checksum = { @@ -1749,15 +1684,8 @@ impl WalFile { "WAL must be enabled" ); let file = shared.file.as_ref().unwrap(); - self.io - .wait_for_completion(sqlite3_ondisk::begin_write_wal_header( - file, - &shared.wal_header.lock(), - )?)?; - self.io - .wait_for_completion(file.sync(Completion::new_sync(|_| {}))?)?; - shared.initialized.store(true, Ordering::Release); - Ok(()) + let c = sqlite3_ondisk::begin_write_wal_header(file, &shared.wal_header.lock())?; + Ok(Some(c)) } fn checkpoint_inner( From e25028551350155254b35b1e1e31c7a4b11c3e4e Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 14:40:44 +0400 Subject: [PATCH 02/17] make op_transaction non-blocking --- core/vdbe/execute.rs | 354 ++++++++++++++++++++++++------------------- core/vdbe/mod.rs | 3 + 2 files changed, 198 insertions(+), 159 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index e8ba7aa3e..dfcc53994 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2080,7 +2080,29 @@ pub fn op_halt_if_null( } } +#[derive(Debug, Clone, Copy)] +pub enum OpTransactionState { + Start, + CheckSchemaCookie, +} + pub fn op_transaction( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Arc, + mv_store: Option<&Arc>, +) -> Result { + match op_transaction_inner(program, state, insn, pager, mv_store) { + Ok(result) => Ok(result), + Err(err) => { + state.op_transaction_state = OpTransactionState::Start; + Err(err) + } + } +} + +pub fn op_transaction_inner( program: &Program, state: &mut ProgramState, insn: &Insn, @@ -2095,175 +2117,189 @@ pub fn op_transaction( }, insn ); - let conn = program.connection.clone(); - let write = matches!(tx_mode, TransactionMode::Write); - if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { - return Err(LimboError::ReadOnly); - } - let pager = program.get_pager_from_database_index(db); - - // 1. We try to upgrade current version - let current_state = conn.transaction_state.get(); - let (new_transaction_state, updated) = if conn.is_nested_stmt.get() { - (current_state, false) - } else { - match (current_state, write) { - // pending state means that we tried beginning a tx and the method returned IO. - // instead of ending the read tx, just update the state to pending. - (TransactionState::PendingUpgrade, write) => { - turso_assert!( - write, - "pending upgrade should only be set for write transactions" - ); - ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ) - } - (TransactionState::Write { schema_did_change }, true) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Write { schema_did_change }, false) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Read, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::None, false) => (TransactionState::Read, true), - } - }; - - // 2. Start transaction if needed - if let Some(mv_store) = &mv_store { - // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. - // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough - // for both. - if program.connection.mv_tx.get().is_none() { - // We allocate the first page lazily in the first transaction. - return_if_io!(pager.maybe_allocate_page1()); - // TODO: when we fix MVCC enable schema cookie detection for reprepare statements - // let header_schema_cookie = pager - // .io - // .block(|| pager.with_header(|header| header.schema_cookie.get()))?; - // if header_schema_cookie != *schema_cookie { - // return Err(LimboError::SchemaUpdated); - // } - let tx_id = match tx_mode { - TransactionMode::None | TransactionMode::Read | TransactionMode::Concurrent => { - mv_store.begin_tx(pager.clone())? + loop { + match state.op_transaction_state { + OpTransactionState::Start => { + let conn = program.connection.clone(); + let write = matches!(tx_mode, TransactionMode::Write); + if write && conn._db.open_flags.contains(OpenFlags::ReadOnly) { + return Err(LimboError::ReadOnly); } - TransactionMode::Write => { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) - } - }; - program.connection.mv_tx.set(Some((tx_id, *tx_mode))); - } else if updated { - // TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example - let mv_tx_mode = program.connection.mv_tx.get().unwrap().1; - let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent { - TransactionMode::Concurrent - } else { - *tx_mode - }; - if matches!(new_transaction_state, TransactionState::Write { .. }) - && matches!(actual_tx_mode, TransactionMode::Write) - { - let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap(); - if mv_tx_mode == TransactionMode::Read { - return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))); + + // 1. We try to upgrade current version + let current_state = conn.transaction_state.get(); + let (new_transaction_state, updated) = if conn.is_nested_stmt.get() { + (current_state, false) } else { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + match (current_state, write) { + // pending state means that we tried beginning a tx and the method returned IO. + // instead of ending the read tx, just update the state to pending. + (TransactionState::PendingUpgrade, write) => { + turso_assert!( + write, + "pending upgrade should only be set for write transactions" + ); + ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ) + } + (TransactionState::Write { schema_did_change }, true) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Write { schema_did_change }, false) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Read, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::None, false) => (TransactionState::Read, true), + } + }; + + // 2. Start transaction if needed + if let Some(mv_store) = &mv_store { + // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. + // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough + // for both. + if program.connection.mv_tx.get().is_none() { + // We allocate the first page lazily in the first transaction. + return_if_io!(pager.maybe_allocate_page1()); + // TODO: when we fix MVCC enable schema cookie detection for reprepare statements + // let header_schema_cookie = pager + // .io + // .block(|| pager.with_header(|header| header.schema_cookie.get()))?; + // if header_schema_cookie != *schema_cookie { + // return Err(LimboError::SchemaUpdated); + // } + let tx_id = match tx_mode { + TransactionMode::None + | TransactionMode::Read + | TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?, + TransactionMode::Write => { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) + } + }; + program.connection.mv_tx.set(Some((tx_id, *tx_mode))); + } else if updated { + // TODO: fix tx_mode in Insn::Transaction, now each statement overrides it even if there's already a CONCURRENT Tx in progress, for example + let mv_tx_mode = program.connection.mv_tx.get().unwrap().1; + let actual_tx_mode = if mv_tx_mode == TransactionMode::Concurrent { + TransactionMode::Concurrent + } else { + *tx_mode + }; + if matches!(new_transaction_state, TransactionState::Write { .. }) + && matches!(actual_tx_mode, TransactionMode::Write) + { + let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap(); + if mv_tx_mode == TransactionMode::Read { + return_if_io!( + mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)) + ); + } else { + return_if_io!( + mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)) + ); + } + } + } + } else { + if matches!(tx_mode, TransactionMode::Concurrent) { + return Err(LimboError::TxError( + "Concurrent transaction mode is only supported when MVCC is enabled" + .to_string(), + )); + } + if updated && matches!(current_state, TransactionState::None) { + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new read transaction" + ); + pager.begin_read_tx()?; + } + + if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { + turso_assert!( + !conn.is_nested_stmt.get(), + "nested stmt should not begin a new write transaction" + ); + let begin_w_tx_res = pager.begin_write_tx(); + if let Err(LimboError::Busy) = begin_w_tx_res { + // We failed to upgrade to write transaction so put the transaction into its original state. + // That is, if the transaction had not started, end the read transaction so that next time we + // start a new one. + if matches!(current_state, TransactionState::None) { + pager.end_read_tx()?; + conn.transaction_state.replace(TransactionState::None); + } + assert_eq!(conn.transaction_state.get(), current_state); + return Err(LimboError::Busy); + } + if let IOResult::IO(io) = begin_w_tx_res? { + // set the transaction state to pending so we don't have to + // end the read transaction. + program + .connection + .transaction_state + .replace(TransactionState::PendingUpgrade); + return Ok(InsnFunctionStepResult::IO(io)); + } + } } - } - } - } else { - if matches!(tx_mode, TransactionMode::Concurrent) { - return Err(LimboError::TxError( - "Concurrent transaction mode is only supported when MVCC is enabled".to_string(), - )); - } - if updated && matches!(current_state, TransactionState::None) { - turso_assert!( - !conn.is_nested_stmt.get(), - "nested stmt should not begin a new read transaction" - ); - pager.begin_read_tx()?; - } - if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { - turso_assert!( - !conn.is_nested_stmt.get(), - "nested stmt should not begin a new write transaction" - ); - let begin_w_tx_res = pager.begin_write_tx(); - if let Err(LimboError::Busy) = begin_w_tx_res { - // We failed to upgrade to write transaction so put the transaction into its original state. - // That is, if the transaction had not started, end the read transaction so that next time we - // start a new one. - if matches!(current_state, TransactionState::None) { - pager.end_read_tx()?; - conn.transaction_state.replace(TransactionState::None); + // 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error + if updated { + conn.transaction_state.replace(new_transaction_state); } - assert_eq!(conn.transaction_state.get(), current_state); - return Err(LimboError::Busy); + + state.op_transaction_state = OpTransactionState::CheckSchemaCookie; + continue; } - if let IOResult::IO(io) = begin_w_tx_res? { - // set the transaction state to pending so we don't have to - // end the read transaction. - program - .connection - .transaction_state - .replace(TransactionState::PendingUpgrade); - return Ok(InsnFunctionStepResult::IO(io)); + // 4. Check whether schema has changed if we are actually going to access the database. + // Can only read header if page 1 has been allocated already + // begin_write_tx that happens, but not begin_read_tx + // TODO: this is a hack to make the pager run the IO loop + OpTransactionState::CheckSchemaCookie => { + let res = pager + .io + .block(|| pager.with_header(|header| header.schema_cookie.get())); + match res { + Ok(header_schema_cookie) => { + if header_schema_cookie != *schema_cookie { + tracing::debug!( + "schema changed, force reprepare: {} != {}", + header_schema_cookie, + *schema_cookie + ); + return Err(LimboError::SchemaUpdated); + } + } + // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution + Err(LimboError::Page1NotAlloc) => {} + Err(err) => { + return Err(err); + } + } + + state.pc += 1; + Ok(InsnFunctionStepResult::Step) } } } - - // 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error - if updated { - conn.transaction_state.replace(new_transaction_state); - } - - // 4. Check whether schema has changed if we are actually going to access the database. - // Can only read header if page 1 has been allocated already - // begin_write_tx that happens, but not begin_read_tx - // TODO: this is a hack to make the pager run the IO loop - let res = pager - .io - .block(|| pager.with_header(|header| header.schema_cookie.get())); - match res { - Ok(header_schema_cookie) => { - if header_schema_cookie != *schema_cookie { - tracing::debug!( - "schema changed, force reprepare: {} != {}", - header_schema_cookie, - *schema_cookie - ); - return Err(LimboError::SchemaUpdated); - } - } - // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution - Err(LimboError::Page1NotAlloc) => {} - Err(err) => { - return Err(err); - } - } - - state.pc += 1; - Ok(InsnFunctionStepResult::Step) } pub fn op_auto_commit( diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index c93bebf65..a39902bf0 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -37,6 +37,7 @@ use crate::{ execute::{ OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, + OpTransactionState, }, metrics::StatementMetrics, }, @@ -290,6 +291,7 @@ pub struct ProgramState { current_collation: Option, op_column_state: OpColumnState, op_row_id_state: OpRowIdState, + op_transaction_state: OpTransactionState, /// State machine for committing view deltas with I/O handling view_delta_state: ViewDeltaCommitState, } @@ -333,6 +335,7 @@ impl ProgramState { current_collation: None, op_column_state: OpColumnState::Start, op_row_id_state: OpRowIdState::Start, + op_transaction_state: OpTransactionState::Start, view_delta_state: ViewDeltaCommitState::NotStarted, } } From 18feacf8242712bcf582f915966b8baa0d64a489 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 14:50:43 +0400 Subject: [PATCH 03/17] remove io.block from op_sorter_open --- core/vdbe/execute.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index dfcc53994..a0708f74c 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3926,14 +3926,13 @@ pub fn op_sorter_open( }, insn ); - let cache_size = program.connection.get_cache_size(); - // Set the buffer size threshold to be roughly the same as the limit configured for the page-cache. - let page_size = pager - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default() - .get() as usize; + // be careful here - we must not use any async operations after pager.with_header because this op-code has no proper state-machine + let page_size = return_if_io!(pager.with_header(|header| header.page_size)); + let page_size = page_size.get() as usize; + let cache_size = program.connection.get_cache_size(); + + // Set the buffer size threshold to be roughly the same as the limit configured for the page-cache. let max_buffer_size_bytes = if cache_size < 0 { (cache_size.abs() * 1024) as usize } else { From 85b671997ec55492a21a1902d3f296e0077385c2 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 14:50:56 +0400 Subject: [PATCH 04/17] remove io.block from op_open_ephemeral --- core/vdbe/execute.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index a0708f74c..244d117c0 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -7110,6 +7110,7 @@ pub fn op_open_ephemeral( match &mut state.op_open_ephemeral_state { OpOpenEphemeralState::Start => { tracing::trace!("Start"); + let page_size = return_if_io!(pager.with_header(|header| header.page_size)); let conn = program.connection.clone(); let io = conn.pager.borrow().io.clone(); let rand_num = io.generate_random_number(); @@ -7142,11 +7143,6 @@ pub fn op_open_ephemeral( db_file_io = io; } - let page_size = pager - .io - .block(|| pager.with_header(|header| header.page_size))? - .get(); - let buffer_pool = program.connection._db.buffer_pool.clone(); let page_cache = Arc::new(RwLock::new(PageCache::default())); @@ -7159,12 +7155,6 @@ pub fn op_open_ephemeral( Arc::new(AtomicDbState::new(DbState::Uninitialized)), Arc::new(Mutex::new(())), )?); - - let page_size = pager - .io - .block(|| pager.with_header(|header| header.page_size)) - .unwrap_or_default(); - pager.page_size.set(Some(page_size)); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; From 27627bdb8d50e6a4da0b77d2691b245bc1aee82e Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 14:51:08 +0400 Subject: [PATCH 05/17] remove IOExt import from execute --- core/vdbe/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 244d117c0..e193cd05f 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -14,7 +14,7 @@ use crate::types::{ compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord, SeekResult, Text, }; -use crate::util::{normalize_ident, IOExt as _}; +use crate::util::normalize_ident; use crate::vdbe::insn::InsertFlags; use crate::vdbe::registers_to_ref_values; use crate::vector::{vector_concat, vector_slice}; From c4c9d409c95c5e2c8bc93cb6057ae3171820da17 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 16:35:36 +0400 Subject: [PATCH 06/17] remove io.block from op_checkpoint --- core/vdbe/execute.rs | 110 +++++++++++++++++++++++++++++++++++-------- core/vdbe/mod.rs | 8 ++-- 2 files changed, 96 insertions(+), 22 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index e193cd05f..ccb7f4b3e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -330,12 +330,35 @@ pub fn op_bit_not( Ok(InsnFunctionStepResult::Step) } +#[derive(Debug)] +pub enum OpCheckpointState { + StartCheckpoint, + FinishCheckpoint { result: Option }, + CompleteResult { result: Result }, +} + pub fn op_checkpoint( program: &Program, state: &mut ProgramState, insn: &Insn, pager: &Arc, mv_store: Option<&Arc>, +) -> Result { + match op_checkpoint_inner(program, state, insn, pager, mv_store) { + Ok(result) => Ok(result), + Err(err) => { + state.op_checkpoint_state = OpCheckpointState::StartCheckpoint; + return Err(err); + } + } +} + +pub fn op_checkpoint_inner( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Arc>, ) -> Result { load_insn!( Checkpoint { @@ -352,26 +375,75 @@ pub fn op_checkpoint( // however. return Err(LimboError::TableLocked); } - let result = program.connection.checkpoint(*checkpoint_mode); - match result { - Ok(CheckpointResult { - num_attempted, - num_backfilled, - .. - }) => { - // https://sqlite.org/pragma.html#pragma_wal_checkpoint - // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). - state.registers[*dest] = Register::Value(Value::Integer(0)); - // 2nd col: # modified pages written to wal file - state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64)); - // 3rd col: # pages moved to db after checkpoint - state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64)); - } - Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), - } + loop { + match &mut state.op_checkpoint_state { + OpCheckpointState::StartCheckpoint => { + let step_result = program + .connection + .pager + .borrow_mut() + .wal_checkpoint_start(*checkpoint_mode); + match step_result { + Ok(IOResult::Done(result)) => { + state.op_checkpoint_state = OpCheckpointState::FinishCheckpoint { + result: Some(result), + }; + continue; + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + Err(err) => { + state.op_checkpoint_state = + OpCheckpointState::CompleteResult { result: Err(err) }; + continue; + } + } + } + OpCheckpointState::FinishCheckpoint { result } => { + let step_result = program + .connection + .pager + .borrow_mut() + .wal_checkpoint_finish(result.as_mut().unwrap()); + match step_result { + Ok(IOResult::Done(())) => { + state.op_checkpoint_state = OpCheckpointState::CompleteResult { + result: Ok(result.take().unwrap()), + }; + continue; + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + Err(err) => { + state.op_checkpoint_state = + OpCheckpointState::CompleteResult { result: Err(err) }; + continue; + } + } + } + OpCheckpointState::CompleteResult { result } => { + match result { + Ok(CheckpointResult { + num_attempted, + num_backfilled, + .. + }) => { + // https://sqlite.org/pragma.html#pragma_wal_checkpoint + // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). + state.registers[*dest] = Register::Value(Value::Integer(0)); + // 2nd col: # modified pages written to wal file + state.registers[*dest + 1] = + Register::Value(Value::Integer(*num_attempted as i64)); + // 3rd col: # pages moved to db after checkpoint + state.registers[*dest + 2] = + Register::Value(Value::Integer(*num_backfilled as i64)); + } + Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), + } - state.pc += 1; - Ok(InsnFunctionStepResult::Step) + state.pc += 1; + return Ok(InsnFunctionStepResult::Step); + } + } + } } pub fn op_null( diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index a39902bf0..66fc2a774 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -35,9 +35,9 @@ use crate::{ types::{IOCompletions, IOResult, RawSlice, TextRef}, vdbe::{ execute::{ - OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, - OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, - OpTransactionState, + OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, + OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, + OpSeekState, OpTransactionState, }, metrics::StatementMetrics, }, @@ -292,6 +292,7 @@ pub struct ProgramState { op_column_state: OpColumnState, op_row_id_state: OpRowIdState, op_transaction_state: OpTransactionState, + op_checkpoint_state: OpCheckpointState, /// State machine for committing view deltas with I/O handling view_delta_state: ViewDeltaCommitState, } @@ -336,6 +337,7 @@ impl ProgramState { op_column_state: OpColumnState::Start, op_row_id_state: OpRowIdState::Start, op_transaction_state: OpTransactionState::Start, + op_checkpoint_state: OpCheckpointState::StartCheckpoint, view_delta_state: ViewDeltaCommitState::NotStarted, } } From 2c09d17dfe15dc7076a93d37006d89879e9e8318 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 16:36:08 +0400 Subject: [PATCH 07/17] make checkpoint async --- core/storage/pager.rs | 62 +++++++++------ core/storage/wal.rs | 170 +++++++++++++++++++++++++++--------------- 2 files changed, 151 insertions(+), 81 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a099e438c..0114e58ef 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1675,15 +1675,20 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result { + pub fn wal_checkpoint_start(&self, mode: CheckpointMode) -> Result> { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( "wal_checkpoint() called on database without WAL".to_string(), )); }; - let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?; + wal.borrow_mut().checkpoint(self, mode) + } + pub fn wal_checkpoint_finish( + &self, + checkpoint_result: &mut CheckpointResult, + ) -> Result> { 'ensure_sync: { if checkpoint_result.num_backfilled != 0 { if checkpoint_result.everything_backfilled() { @@ -1694,27 +1699,35 @@ impl Pager { let page_size = self.page_size.get().unwrap_or_default(); let expected = (db_size * page_size.get()) as u64; if expected < self.db_file.size()? { - self.io.wait_for_completion(self.db_file.truncate( - expected as usize, - Completion::new_trunc(move |_| { - tracing::trace!( - "Database file truncated to expected size: {} bytes", - expected - ); - }), - )?)?; - self.io - .wait_for_completion(self.db_file.sync(Completion::new_sync( - move |_| { - tracing::trace!("Database file syncd after truncation"); - }, - ))?)?; + if !checkpoint_result.db_truncate_sent { + let c = self.db_file.truncate( + expected as usize, + Completion::new_trunc(move |_| { + tracing::trace!( + "Database file truncated to expected size: {} bytes", + expected + ); + }), + )?; + checkpoint_result.db_truncate_sent = true; + io_yield_one!(c); + } + if !checkpoint_result.db_sync_sent { + let c = self.db_file.sync(Completion::new_sync(move |_| { + tracing::trace!("Database file syncd after truncation"); + }))?; + checkpoint_result.db_sync_sent = true; + io_yield_one!(c); + } break 'ensure_sync; } } - // if we backfilled at all, we have to sync the db-file here - self.io - .wait_for_completion(self.db_file.sync(Completion::new_sync(move |_| {}))?)?; + if !checkpoint_result.db_sync_sent { + // if we backfilled at all, we have to sync the db-file here + let c = self.db_file.sync(Completion::new_sync(move |_| {}))?; + checkpoint_result.db_sync_sent = true; + io_yield_one!(c); + } } } checkpoint_result.release_guard(); @@ -1723,7 +1736,14 @@ impl Pager { .write() .clear() .map_err(|e| LimboError::InternalError(format!("Failed to clear page cache: {e:?}")))?; - Ok(checkpoint_result) + Ok(IOResult::Done(())) + } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result { + let mut result = self.io.block(|| self.wal_checkpoint_start(mode))?; + self.io.block(|| self.wal_checkpoint_finish(&mut result))?; + Ok(result) } pub fn freepage_list(&self) -> u32 { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index be77c7890..015b72dda 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -24,8 +24,8 @@ use crate::storage::sqlite3_ondisk::{ }; use crate::types::{IOCompletions, IOResult}; use crate::{ - bail_corrupt_error, io_yield_many, turso_assert, Buffer, Completion, CompletionError, - IOContext, LimboError, Result, + bail_corrupt_error, io_yield_many, io_yield_one, return_if_io, turso_assert, Buffer, + Completion, CompletionError, IOContext, LimboError, Result, }; #[derive(Debug, Clone, Default)] @@ -38,6 +38,8 @@ pub struct CheckpointResult { /// In the case of everything backfilled, we need to hold the locks until the db /// file is truncated. maybe_guard: Option, + pub db_truncate_sent: bool, + pub db_sync_sent: bool, } impl Drop for CheckpointResult { @@ -53,6 +55,8 @@ impl CheckpointResult { num_backfilled: n_ckpt, max_frame, maybe_guard: None, + db_sync_sent: false, + db_truncate_sent: false, } } @@ -314,11 +318,16 @@ pub trait Wal: Debug { fn as_any(&self) -> &dyn std::any::Any; } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] pub enum CheckpointState { Start, Processing, - Done, + Finalize, + Truncate { + checkpoint_result: Option, + truncate_sent: bool, + sync_sent: bool, + }, } /// IOV_MAX is 1024 on most systems, lets use 512 to be safe @@ -1694,7 +1703,7 @@ impl WalFile { mode: CheckpointMode, ) -> Result> { loop { - let state = self.ongoing_checkpoint.state; + let state = &mut self.ongoing_checkpoint.state; tracing::debug!(?state); match state { // Acquire the relevant exclusive locks and checkpoint_lock @@ -1716,6 +1725,8 @@ impl WalFile { num_backfilled: self.prev_checkpoint.num_backfilled, max_frame: nbackfills, maybe_guard: None, + db_sync_sent: false, + db_truncate_sent: false, })); } // acquire the appropriate exclusive locks depending on the checkpoint mode @@ -1868,19 +1879,19 @@ impl WalFile { if !completions.is_empty() { io_yield_many!(completions); } else if self.ongoing_checkpoint.complete() { - self.ongoing_checkpoint.state = CheckpointState::Done; + self.ongoing_checkpoint.state = CheckpointState::Finalize; } } // All eligible frames copied to the db file // Update nBackfills // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled - CheckpointState::Done => { + CheckpointState::Finalize => { turso_assert!( self.ongoing_checkpoint.complete(), "checkpoint pending flush must have finished" ); - let mut checkpoint_result = { + let checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::Acquire); let nbackfills = shared.nbackfills.load(Ordering::Acquire); @@ -1925,9 +1936,24 @@ impl WalFile { if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() { return Err(LimboError::Busy); } - if mode.should_restart_log() { - self.restart_log(mode)?; - } + self.restart_log_if_needed(mode)?; + self.ongoing_checkpoint.state = CheckpointState::Truncate { + checkpoint_result: Some(checkpoint_result), + truncate_sent: false, + sync_sent: false, + }; + } + CheckpointState::Truncate { .. } => { + return_if_io!(self.truncate_log_if_needed(mode)); + let mut checkpoint_result = { + let CheckpointState::Truncate { + checkpoint_result, .. + } = &mut self.ongoing_checkpoint.state + else { + panic!("unxpected state"); + }; + checkpoint_result.take().unwrap() + }; // increment wal epoch to ensure no stale pages are used for backfilling self.get_shared().epoch.fetch_add(1, Ordering::Release); @@ -2025,11 +2051,10 @@ impl WalFile { /// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode. /// Must be invoked while writer and checkpoint locks are still held. - fn restart_log(&mut self, mode: CheckpointMode) -> Result<()> { - turso_assert!( - mode.should_restart_log(), - "CheckpointMode must be Restart or Truncate" - ); + fn restart_log_if_needed(&mut self, mode: CheckpointMode) -> Result<()> { + if !mode.should_restart_log() { + return Ok(()); + } turso_assert!( matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })), "We must hold writer and checkpoint locks to restart the log, found: {:?}", @@ -2054,62 +2079,88 @@ impl WalFile { } } - let unlock = |e: Option<&LimboError>| { - // release all read locks we just acquired, the caller will take care of the others - let shared = self.shared.write(); - for idx in 1..shared.read_locks.len() { - shared.read_locks[idx].unlock(); - } - if let Some(e) = e { - tracing::error!( - "Failed to restart WAL header: {:?}, releasing read locks", - e - ); - } - }; // reinitialize in‑memory state - self.get_shared_mut() - .restart_wal_header(&self.io, mode) - .inspect_err(|e| { - unlock(Some(e)); - })?; + self.get_shared_mut().restart_wal_header(&self.io, mode); let cksm = self.get_shared().last_checksum; self.last_checksum = cksm; self.max_frame = 0; self.min_frame = 0; self.checkpoint_seq.fetch_add(1, Ordering::Release); + Ok(()) + } - // For TRUNCATE mode: shrink the WAL file to 0 B - if matches!(mode, CheckpointMode::Truncate { .. }) { - let c = Completion::new_trunc(|_| { - tracing::trace!("WAL file truncated to 0 B"); - }); + fn truncate_log_if_needed(&mut self, mode: CheckpointMode) -> Result> { + if !matches!(mode, CheckpointMode::Truncate { .. }) { + Self::unlock(&self.shared, None); + return Ok(IOResult::Done(())); + } + let file = { let shared = self.get_shared(); - // for now at least, lets do all this IO syncronously assert!( shared.enabled.load(Ordering::Relaxed), "WAL must be enabled" ); - let file = shared.file.as_ref().unwrap(); - let c = file.truncate(0, c).inspect_err(|e| unlock(Some(e)))?; shared.initialized.store(false, Ordering::Release); - self.io - .wait_for_completion(c) - .inspect_err(|e| unlock(Some(e)))?; - // fsync after truncation - self.io - .wait_for_completion( - file.sync(Completion::new_sync(|_| { - tracing::trace!("WAL file synced after reset/truncation"); - })) - .inspect_err(|e| unlock(Some(e)))?, - ) - .inspect_err(|e| unlock(Some(e)))?; - } + shared.file.as_ref().unwrap().clone() + }; - // release read‑locks 1..4 - unlock(None); - Ok(()) + let CheckpointState::Truncate { + sync_sent, + truncate_sent, + .. + } = &mut self.ongoing_checkpoint.state + else { + panic!("unxpected state"); + }; + if !*truncate_sent { + // For TRUNCATE mode: shrink the WAL file to 0 B + + let c = Completion::new_trunc({ + let shared = self.shared.clone(); + move |result| { + if let Err(err) = result { + Self::unlock(&shared, Some(&LimboError::InternalError(err.to_string()))); + } else { + tracing::trace!("WAL file truncated to 0 B"); + } + } + }); + let c = file + .truncate(0, c) + .inspect_err(|e| Self::unlock(&self.shared, Some(e)))?; + *truncate_sent = true; + io_yield_one!(c); + } else if !*sync_sent { + let shared = self.shared.clone(); + let c = file + .sync(Completion::new_sync(move |result| { + if let Err(err) = result { + Self::unlock(&shared, Some(&LimboError::InternalError(err.to_string()))); + } else { + tracing::trace!("WAL file synced after reset/truncation"); + } + })) + .inspect_err(|e| Self::unlock(&self.shared, Some(e)))?; + *sync_sent = true; + io_yield_one!(c); + } else { + Self::unlock(&self.shared, None); + } + Ok(IOResult::Done(())) + } + + fn unlock(shared: &Arc>, e: Option<&LimboError>) { + // release all read locks we just acquired, the caller will take care of the others + let shared = shared.write(); + for idx in 1..shared.read_locks.len() { + shared.read_locks[idx].unlock(); + } + if let Some(e) = e { + tracing::error!( + "Failed to restart WAL header: {:?}, releasing read locks", + e + ); + } } fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> { @@ -2327,7 +2378,7 @@ impl WalFileShared { /// This function updates the shared-memory structures so that the next /// client to write to the database (which may be this one) does so by /// writing frames into the start of the log file. - fn restart_wal_header(&mut self, io: &Arc, mode: CheckpointMode) -> Result<()> { + fn restart_wal_header(&mut self, io: &Arc, mode: CheckpointMode) { turso_assert!( matches!( mode, @@ -2354,7 +2405,6 @@ impl WalFileShared { for lock in &self.read_locks[2..] { lock.set_value_exclusive(READMARK_NOT_USED); } - Ok(()) } } From a8bc06ea9c56c3b72b2effc58cea60da8fbfc84c Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 16:37:11 +0400 Subject: [PATCH 08/17] fix clippy --- core/vdbe/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ccb7f4b3e..80b04fb97 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -348,7 +348,7 @@ pub fn op_checkpoint( Ok(result) => Ok(result), Err(err) => { state.op_checkpoint_state = OpCheckpointState::StartCheckpoint; - return Err(err); + Err(err) } } } From b2afdb8d290018ffc7a0b613986551ee62b9d38a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 16:43:13 +0400 Subject: [PATCH 09/17] fix comment --- core/storage/wal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 015b72dda..7ac294346 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1439,7 +1439,7 @@ impl Wal for WalFile { ); turso_assert!( self.get_shared().is_initialized()?, - "WAL must be prepared with prepare_append method" + "WAL must be prepared with prepare_wal_start/prepare_wal_finish method" ); let (header, shared_page_size, epoch) = { From dea041b7c091fa1e08123b2db301e3d5e149adfe Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 16:50:40 +0400 Subject: [PATCH 10/17] fix after rebase --- core/vdbe/execute.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 80b04fb97..c34a1da0d 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -357,7 +357,7 @@ pub fn op_checkpoint_inner( program: &Program, state: &mut ProgramState, insn: &Insn, - pager: &Rc, + pager: &Arc, mv_store: Option<&Arc>, ) -> Result { load_insn!( @@ -2346,11 +2346,9 @@ pub fn op_transaction_inner( // begin_write_tx that happens, but not begin_read_tx // TODO: this is a hack to make the pager run the IO loop OpTransactionState::CheckSchemaCookie => { - let res = pager - .io - .block(|| pager.with_header(|header| header.schema_cookie.get())); + let res = pager.with_header(|header| header.schema_cookie.get()); match res { - Ok(header_schema_cookie) => { + Ok(IOResult::Done(header_schema_cookie)) => { if header_schema_cookie != *schema_cookie { tracing::debug!( "schema changed, force reprepare: {} != {}", @@ -2360,6 +2358,7 @@ pub fn op_transaction_inner( return Err(LimboError::SchemaUpdated); } } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution Err(LimboError::Page1NotAlloc) => {} Err(err) => { @@ -2368,7 +2367,7 @@ pub fn op_transaction_inner( } state.pc += 1; - Ok(InsnFunctionStepResult::Step) + return Ok(InsnFunctionStepResult::Step); } } } From 5c4d8aa10bb79e5be0b0b050982c9ee0fafbddc8 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 18:05:12 +0400 Subject: [PATCH 11/17] fix bug after making checkpoint async --- core/storage/pager.rs | 15 ++++++++------- core/storage/wal.rs | 28 +++++++++++++++------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 0114e58ef..f0bffb2c4 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -321,7 +321,7 @@ enum CommitState { /// Sync WAL header after prepare PrepareWalSync, /// Appends all frames to the WAL. - Start, + PrepareFrames, /// Fsync the on-disk WAL. SyncWal, /// Checkpoint the WAL to the database file (if needed). @@ -1263,7 +1263,8 @@ impl Pager { let mut pages = Vec::with_capacity(len); let page_sz = self.page_size.get().unwrap_or_default(); - if let Some(c) = wal.borrow_mut().prepare_wal_start(page_sz)? { + let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?; + if let Some(c) = prepare { self.io.wait_for_completion(c)?; let c = wal.borrow_mut().prepare_wal_finish()?; self.io.wait_for_completion(c)?; @@ -1340,7 +1341,7 @@ impl Pager { let page_sz = self.page_size.get().expect("page size not set"); let c = wal.borrow_mut().prepare_wal_start(page_sz)?; let Some(c) = c else { - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareFrames); continue; }; self.commit_info.state.set(CommitState::PrepareWalSync); @@ -1350,12 +1351,12 @@ impl Pager { } CommitState::PrepareWalSync => { let c = wal.borrow_mut().prepare_wal_finish()?; - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareFrames); if !c.is_completed() { io_yield_one!(c); } } - CommitState::Start => { + CommitState::PrepareFrames => { let now = self.io.now(); self.commit_info.time.set(now); let db_size_after = { @@ -1503,7 +1504,7 @@ impl Pager { let mut completions = self.commit_info.completions.borrow_mut(); if completions.iter().all(|c| c.is_completed()) { completions.clear(); - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareWal); wal.borrow_mut().finish_append_frames_commit()?; let result = self.commit_info.result.borrow_mut().take(); return Ok(IOResult::Done(result.expect("commit result should be set"))); @@ -2229,7 +2230,7 @@ impl Pager { fn reset_internal_states(&self) { *self.checkpoint_state.write() = CheckpointState::Checkpoint; self.syncing.replace(false); - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareWal); self.commit_info.time.set(self.io.now()); self.allocate_page_state.replace(AllocatePageState::Start); self.free_page_state.replace(FreePageState::Start); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7ac294346..6c4b2a242 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1936,7 +1936,9 @@ impl WalFile { if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() { return Err(LimboError::Busy); } - self.restart_log_if_needed(mode)?; + if mode.should_restart_log() { + self.restart_log(mode)?; + } self.ongoing_checkpoint.state = CheckpointState::Truncate { checkpoint_result: Some(checkpoint_result), truncate_sent: false, @@ -1944,7 +1946,12 @@ impl WalFile { }; } CheckpointState::Truncate { .. } => { - return_if_io!(self.truncate_log_if_needed(mode)); + if matches!(mode, CheckpointMode::Truncate { .. }) { + return_if_io!(self.truncate_log()); + } + if mode.should_restart_log() { + Self::unlock(&self.shared, None); + } let mut checkpoint_result = { let CheckpointState::Truncate { checkpoint_result, .. @@ -2051,10 +2058,11 @@ impl WalFile { /// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode. /// Must be invoked while writer and checkpoint locks are still held. - fn restart_log_if_needed(&mut self, mode: CheckpointMode) -> Result<()> { - if !mode.should_restart_log() { - return Ok(()); - } + fn restart_log(&mut self, mode: CheckpointMode) -> Result<()> { + turso_assert!( + mode.should_restart_log(), + "CheckpointMode must be Restart or Truncate" + ); turso_assert!( matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })), "We must hold writer and checkpoint locks to restart the log, found: {:?}", @@ -2089,11 +2097,7 @@ impl WalFile { Ok(()) } - fn truncate_log_if_needed(&mut self, mode: CheckpointMode) -> Result> { - if !matches!(mode, CheckpointMode::Truncate { .. }) { - Self::unlock(&self.shared, None); - return Ok(IOResult::Done(())); - } + fn truncate_log(&mut self) -> Result> { let file = { let shared = self.get_shared(); assert!( @@ -2143,8 +2147,6 @@ impl WalFile { .inspect_err(|e| Self::unlock(&self.shared, Some(e)))?; *sync_sent = true; io_yield_one!(c); - } else { - Self::unlock(&self.shared, None); } Ok(IOResult::Done(())) } From d16d86b85d7571d1fc4e08ae759b3b4a05ccc673 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 18:25:32 +0400 Subject: [PATCH 12/17] fix blocking ensure_header_if_needed implementation --- core/storage/wal.rs | 94 ++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 6c4b2a242..39fb79e46 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1408,8 +1408,49 @@ impl Wal for WalFile { Ok(pages) } - fn prepare_wal_start(&mut self, page_sz: PageSize) -> Result> { - self.ensure_header_if_needed(page_sz) + fn prepare_wal_start(&mut self, page_size: PageSize) -> Result> { + if self.get_shared().is_initialized()? { + return Ok(None); + } + tracing::debug!("ensure_header_if_needed"); + self.last_checksum = { + let mut shared = self.get_shared_mut(); + let checksum = { + let mut hdr = shared.wal_header.lock(); + hdr.magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; + if hdr.page_size == 0 { + hdr.page_size = page_size.get(); + } + if hdr.salt_1 == 0 && hdr.salt_2 == 0 { + hdr.salt_1 = self.io.generate_random_number() as u32; + hdr.salt_2 = self.io.generate_random_number() as u32; + } + + // recompute header checksum + let prefix = &hdr.as_bytes()[..WAL_HEADER_SIZE - 8]; + let use_native = (hdr.magic & 1) != 0; + let (c1, c2) = checksum_wal(prefix, &hdr, (0, 0), use_native); + hdr.checksum_1 = c1; + hdr.checksum_2 = c2; + (c1, c2) + }; + shared.last_checksum = checksum; + checksum + }; + + self.max_frame = 0; + let shared = self.get_shared(); + assert!( + shared.enabled.load(Ordering::Relaxed), + "WAL must be enabled" + ); + let file = shared.file.as_ref().unwrap(); + let c = sqlite3_ondisk::begin_write_wal_header(file, &shared.wal_header.lock())?; + Ok(Some(c)) } fn prepare_wal_finish(&mut self) -> Result { @@ -1652,49 +1693,14 @@ impl WalFile { /// the WAL file has been truncated and we are writing the first /// frame since then. We need to ensure that the header is initialized. - fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result> { - if self.get_shared().is_initialized()? { - return Ok(None); - } - tracing::debug!("ensure_header_if_needed"); - self.last_checksum = { - let mut shared = self.get_shared_mut(); - let checksum = { - let mut hdr = shared.wal_header.lock(); - hdr.magic = if cfg!(target_endian = "big") { - WAL_MAGIC_BE - } else { - WAL_MAGIC_LE - }; - if hdr.page_size == 0 { - hdr.page_size = page_size.get(); - } - if hdr.salt_1 == 0 && hdr.salt_2 == 0 { - hdr.salt_1 = self.io.generate_random_number() as u32; - hdr.salt_2 = self.io.generate_random_number() as u32; - } - - // recompute header checksum - let prefix = &hdr.as_bytes()[..WAL_HEADER_SIZE - 8]; - let use_native = (hdr.magic & 1) != 0; - let (c1, c2) = checksum_wal(prefix, &hdr, (0, 0), use_native); - hdr.checksum_1 = c1; - hdr.checksum_2 = c2; - (c1, c2) - }; - shared.last_checksum = checksum; - checksum + fn ensure_header_if_needed(&mut self, page_size: PageSize) -> Result<()> { + let Some(c) = self.prepare_wal_start(page_size)? else { + return Ok(()); }; - - self.max_frame = 0; - let shared = self.get_shared(); - assert!( - shared.enabled.load(Ordering::Relaxed), - "WAL must be enabled" - ); - let file = shared.file.as_ref().unwrap(); - let c = sqlite3_ondisk::begin_write_wal_header(file, &shared.wal_header.lock())?; - Ok(Some(c)) + self.io.wait_for_completion(c)?; + let c = self.prepare_wal_finish()?; + self.io.wait_for_completion(c)?; + Ok(()) } fn checkpoint_inner( From b5d12b79cd16521af62de07abe967c6d9e8ab92d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 19:02:47 +0400 Subject: [PATCH 13/17] reset commit_state when commit_dirty_pages is done --- core/storage/pager.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f0bffb2c4..0330deebe 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1326,6 +1326,26 @@ impl Pager { wal_auto_checkpoint_disabled: bool, sync_mode: crate::SyncMode, data_sync_retry: bool, + ) -> Result> { + match self.commit_dirty_pages_inner( + wal_auto_checkpoint_disabled, + sync_mode, + data_sync_retry, + ) { + r @ (Ok(IOResult::Done(..)) | Err(..)) => { + self.commit_info.state.set(CommitState::PrepareWal); + r + } + Ok(IOResult::IO(io)) => Ok(IOResult::IO(io)), + } + } + + #[instrument(skip_all, level = Level::DEBUG)] + fn commit_dirty_pages_inner( + &self, + wal_auto_checkpoint_disabled: bool, + sync_mode: crate::SyncMode, + data_sync_retry: bool, ) -> Result> { let Some(wal) = self.wal.as_ref() else { return Err(LimboError::InternalError( From f1764d9f76fb423ccbabcf0e28db2730e45b188b Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 19:03:09 +0400 Subject: [PATCH 14/17] fix test --- .../query_processing/test_multi_thread.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index 0a3b6366e..73817be23 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -16,10 +16,16 @@ fn test_schema_reprepare() { let mut stmt = conn2.prepare("SELECT y, z FROM t").unwrap(); let mut stmt2 = conn2.prepare("SELECT x, z FROM t").unwrap(); conn1.execute("ALTER TABLE t DROP COLUMN x").unwrap(); - assert_eq!( - stmt2.step().unwrap_err().to_string(), - "Parse error: no such column: x" - ); + loop { + match stmt2.step() { + Ok(StepResult::IO) => tmp_db.io.step().unwrap(), + Err(err) => { + assert_eq!(err.to_string(), "Parse error: no such column: x"); + break; + } + r => panic!("unexpected response: {:?}", r), + } + } let mut rows = Vec::new(); loop { From 93e4dfbd6985332c30ecbb742ae5c8712252dd3b Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 19:16:51 +0400 Subject: [PATCH 15/17] remove one more unnecessary io.block --- core/storage/pager.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 0330deebe..da5bae08c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -320,8 +320,12 @@ enum CommitState { PrepareWal, /// Sync WAL header after prepare PrepareWalSync, + /// Get DB size (mostly from page cache - but in rare cases we can read it from disk) + GetDbSize, /// Appends all frames to the WAL. - PrepareFrames, + PrepareFrames { + db_size: u32, + }, /// Fsync the on-disk WAL. SyncWal, /// Checkpoint the WAL to the database file (if needed). @@ -1361,7 +1365,7 @@ impl Pager { let page_sz = self.page_size.get().expect("page size not set"); let c = wal.borrow_mut().prepare_wal_start(page_sz)?; let Some(c) = c else { - self.commit_info.state.set(CommitState::PrepareFrames); + self.commit_info.state.set(CommitState::GetDbSize); continue; }; self.commit_info.state.set(CommitState::PrepareWalSync); @@ -1371,19 +1375,20 @@ impl Pager { } CommitState::PrepareWalSync => { let c = wal.borrow_mut().prepare_wal_finish()?; - self.commit_info.state.set(CommitState::PrepareFrames); + self.commit_info.state.set(CommitState::GetDbSize); if !c.is_completed() { io_yield_one!(c); } } - CommitState::PrepareFrames => { + CommitState::GetDbSize => { + let db_size = return_if_io!(self.with_header(|header| header.database_size)); + self.commit_info.state.set(CommitState::PrepareFrames { + db_size: db_size.get(), + }); + } + CommitState::PrepareFrames { db_size } => { let now = self.io.now(); self.commit_info.time.set(now); - let db_size_after = { - self.io - .block(|| self.with_header(|header| header.database_size))? - .get() - }; let dirty_ids: Vec = self.dirty_pages.read().iter().copied().collect(); if dirty_ids.is_empty() { @@ -1415,7 +1420,7 @@ impl Pager { if end_of_chunk { let commit_flag = if i == total - 1 { // Only the commit frame (final) frame carries the db_size - Some(db_size_after) + Some(db_size) } else { None }; From c1176356f7b430dfedb5edbb151aeac4b79c6f76 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 19:20:42 +0400 Subject: [PATCH 16/17] small fixes --- core/storage/wal.rs | 19 +++++++++++++------ .../query_processing/test_multi_thread.rs | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 39fb79e46..ba4ecbbfc 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1956,7 +1956,7 @@ impl WalFile { return_if_io!(self.truncate_log()); } if mode.should_restart_log() { - Self::unlock(&self.shared, None); + Self::unlock_after_restart(&self.shared, None); } let mut checkpoint_result = { let CheckpointState::Truncate { @@ -2129,7 +2129,10 @@ impl WalFile { let shared = self.shared.clone(); move |result| { if let Err(err) = result { - Self::unlock(&shared, Some(&LimboError::InternalError(err.to_string()))); + Self::unlock_after_restart( + &shared, + Some(&LimboError::InternalError(err.to_string())), + ); } else { tracing::trace!("WAL file truncated to 0 B"); } @@ -2137,7 +2140,7 @@ impl WalFile { }); let c = file .truncate(0, c) - .inspect_err(|e| Self::unlock(&self.shared, Some(e)))?; + .inspect_err(|e| Self::unlock_after_restart(&self.shared, Some(e)))?; *truncate_sent = true; io_yield_one!(c); } else if !*sync_sent { @@ -2145,19 +2148,23 @@ impl WalFile { let c = file .sync(Completion::new_sync(move |result| { if let Err(err) = result { - Self::unlock(&shared, Some(&LimboError::InternalError(err.to_string()))); + Self::unlock_after_restart( + &shared, + Some(&LimboError::InternalError(err.to_string())), + ); } else { tracing::trace!("WAL file synced after reset/truncation"); } })) - .inspect_err(|e| Self::unlock(&self.shared, Some(e)))?; + .inspect_err(|e| Self::unlock_after_restart(&self.shared, Some(e)))?; *sync_sent = true; io_yield_one!(c); } Ok(IOResult::Done(())) } - fn unlock(shared: &Arc>, e: Option<&LimboError>) { + // unlock shared read locks taken by RESTART/TRUNCATE checkpoint modes + fn unlock_after_restart(shared: &Arc>, e: Option<&LimboError>) { // release all read locks we just acquired, the caller will take care of the others let shared = shared.write(); for idx in 1..shared.read_locks.len() { diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index 73817be23..6054c7a32 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -23,7 +23,7 @@ fn test_schema_reprepare() { assert_eq!(err.to_string(), "Parse error: no such column: x"); break; } - r => panic!("unexpected response: {:?}", r), + r => panic!("unexpected response: {r:?}"), } } From bf5397dade5789e7863a2cc28787e7eeaef6ab66 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 21:30:40 +0400 Subject: [PATCH 17/17] fix op_sorter --- core/vdbe/execute.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index c34a1da0d..98574cc80 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -8,7 +8,7 @@ use crate::storage::btree::{ use crate::storage::database::DatabaseFile; use crate::storage::page_cache::PageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; -use crate::storage::sqlite3_ondisk::read_varint; +use crate::storage::sqlite3_ondisk::{read_varint, PageSize}; use crate::translate::collate::CollationSeq; use crate::types::{ compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord, @@ -3998,7 +3998,11 @@ pub fn op_sorter_open( insn ); // be careful here - we must not use any async operations after pager.with_header because this op-code has no proper state-machine - let page_size = return_if_io!(pager.with_header(|header| header.page_size)); + let page_size = match pager.with_header(|header| header.page_size) { + Ok(IOResult::Done(page_size)) => page_size, + Err(_) => PageSize::default(), + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + }; let page_size = page_size.get() as usize; let cache_size = program.connection.get_cache_size();