From 066ffcc94023b53011721db88bd5049b93a1257d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 10 Jul 2025 13:56:47 +0200 Subject: [PATCH 01/10] append frame one by one Let's make sure we don't end up in a weird situation by appending frames one by one and we can later think of optimizations. --- core/storage/pager.rs | 184 +++++++++++++++++++++++++++++++----------- 1 file changed, 137 insertions(+), 47 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 84c15cb5b..2d7d49210 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -186,8 +186,10 @@ impl Page { enum CacheFlushState { /// Idle. Start, - /// Waiting for all in-flight writes to the on-disk WAL to complete. - WaitAppendFrames, + /// Append a single frame to the WAL. + AppendFrame { current_page_to_append_idx: usize }, + /// Wait for append frame to complete. + WaitAppendFrame { current_page_to_append_idx: usize }, } #[derive(Clone, Copy, Debug)] @@ -195,8 +197,11 @@ enum CacheFlushState { enum CommitState { /// Idle. Start, - /// Waiting for all in-flight writes to the on-disk WAL to complete. - WaitAppendFrames, + /// Append a single frame to the WAL. + AppendFrame { current_page_to_append_idx: usize }, + /// Wait for append frame to complete. + /// If the current page is the last page to append, sync wal and clear dirty pages and cache. + WaitAppendFrame { current_page_to_append_idx: usize }, /// Fsync the on-disk WAL. SyncWal, /// Checkpoint the WAL to the database file (if needed). @@ -230,6 +235,8 @@ struct CommitInfo { state: CommitState, /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync. in_flight_writes: Rc>, + /// Dirty pages to be flushed. + dirty_pages: Vec, } /// This will keep track of the state of current cache flush in order to not repeat work @@ -237,6 +244,8 @@ struct FlushInfo { state: CacheFlushState, /// Number of writes taking place. in_flight_writes: Rc>, + /// Dirty pages to be flushed. + dirty_pages: Vec, } /// Track the state of the auto-vacuum mode. @@ -394,6 +403,7 @@ impl Pager { commit_info: RefCell::new(CommitInfo { state: CommitState::Start, in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), }), syncing: Rc::new(RefCell::new(false)), checkpoint_state: RefCell::new(CheckpointState::Checkpoint), @@ -408,6 +418,7 @@ impl Pager { flush_info: RefCell::new(FlushInfo { state: CacheFlushState::Start, in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), }), free_page_state: RefCell::new(FreePageState::Start), }) @@ -894,30 +905,70 @@ impl Pager { trace!(?state); match state { CacheFlushState::Start => { - for page_id in self.dirty_pages.borrow().iter() { + self.dirty_pages.borrow_mut().clear(); + let dirty_pages = self + .dirty_pages + .borrow() + .iter() + .copied() + .collect::>(); + let mut flush_info = self.flush_info.borrow_mut(); + if dirty_pages.is_empty() { + Ok(IOResult::Done(())) + } else { + flush_info.dirty_pages = dirty_pages; + flush_info.state = CacheFlushState::AppendFrame { + current_page_to_append_idx: 0, + }; + Ok(IOResult::IO) + } + } + CacheFlushState::AppendFrame { + current_page_to_append_idx, + } => { + let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(*page_id); + let page_key = PageCacheKey::new(page_id); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!("cacheflush(page={}, page_type={:?})", page_id, page_type); - self.wal.borrow_mut().append_frame( - page.clone(), - 0, - self.flush_info.borrow().in_flight_writes.clone(), - )?; - page.clear_dirty(); - } - self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrames; + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + + self.wal.borrow_mut().append_frame( + page.clone(), + 0, + self.flush_info.borrow().in_flight_writes.clone(), + )?; + page.clear_dirty(); + self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrame { + current_page_to_append_idx, + }; return Ok(IOResult::IO); } - CacheFlushState::WaitAppendFrames => { - let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); - if in_flight == 0 { - self.flush_info.borrow_mut().state = CacheFlushState::Start; - return Ok(IOResult::Done(())); + CacheFlushState::WaitAppendFrame { + current_page_to_append_idx, + } => { + let in_flight = self.flush_info.borrow().in_flight_writes.clone(); + if *in_flight.borrow() == 0 { + if current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1 + { + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = CacheFlushState::Start; + Ok(IOResult::Done(())) + } else { + self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + }; + Ok(IOResult::IO) + } } else { - return Ok(IOResult::IO); + Ok(IOResult::IO) } } } @@ -934,15 +985,35 @@ impl Pager { ) -> Result> { let mut checkpoint_result = CheckpointResult::default(); let res = loop { - let state = self.commit_info.borrow().state; + let state = self.commit_info.borrow().state.clone(); trace!(?state); match state { CommitState::Start => { - let db_size = header_accessor::get_database_size(self)?; - for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() { - let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1; + let dirty_pages = self + .dirty_pages + .borrow() + .iter() + .copied() + .collect::>(); + let mut commit_info = self.commit_info.borrow_mut(); + if dirty_pages.is_empty() { + return Ok(IOResult::Done(PagerCommitResult::WalWritten)); + } else { + commit_info.dirty_pages = dirty_pages; + commit_info.state = CommitState::AppendFrame { + current_page_to_append_idx: 0, + }; + } + } + CommitState::AppendFrame { + current_page_to_append_idx, + } => { + let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; + let is_last_frame = current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1; + let page = { let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(*page_id); + let page_key = PageCacheKey::new(page_id); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); trace!( @@ -950,27 +1021,46 @@ impl Pager { page_id, page_type ); - let db_size = if is_last_frame { db_size } else { 0 }; - self.wal.borrow_mut().append_frame( - page.clone(), - db_size, - self.commit_info.borrow().in_flight_writes.clone(), - )?; - page.clear_dirty(); - } - // This is okay assuming we use shared cache by default. - { - let mut cache = self.page_cache.write(); - cache.clear().unwrap(); - } - self.dirty_pages.borrow_mut().clear(); - self.commit_info.borrow_mut().state = CommitState::WaitAppendFrames; - return Ok(IOResult::IO); + page + }; + + let db_size = { + let db_size = header_accessor::get_database_size(self)?; + if is_last_frame { + db_size + } else { + 0 + } + }; + self.wal.borrow_mut().append_frame( + page.clone(), + db_size, + self.commit_info.borrow().in_flight_writes.clone(), + )?; + page.clear_dirty(); + self.commit_info.borrow_mut().state = CommitState::WaitAppendFrame { + current_page_to_append_idx, + }; } - CommitState::WaitAppendFrames => { - let in_flight = *self.commit_info.borrow().in_flight_writes.borrow(); - if in_flight == 0 { - self.commit_info.borrow_mut().state = CommitState::SyncWal; + CommitState::WaitAppendFrame { + current_page_to_append_idx, + } => { + let in_flight = self.commit_info.borrow().in_flight_writes.clone(); + if *in_flight.borrow() == 0 { + if current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1 + { + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); + } + self.dirty_pages.borrow_mut().clear(); + self.commit_info.borrow_mut().state = CommitState::SyncWal; + } else { + self.commit_info.borrow_mut().state = CommitState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + } + } } else { return Ok(IOResult::IO); } From 5f8e386b48472367a7c4fa348c19ccd2fce636d7 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 17 Jul 2025 15:52:20 +0200 Subject: [PATCH 02/10] reset internal states on rollback --- core/storage/page_cache.rs | 2 +- core/storage/pager.rs | 19 ++++++++++++++++++- core/storage/wal.rs | 7 +++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 55a4fb249..a892a32fc 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -204,7 +204,7 @@ impl DumbLruPageCache { if clean_page { entry_mut.page.clear_loaded(); - debug!("cleaning up page {}", entry_mut.page.get().id); + debug!("clean(page={})", entry_mut.page.get().id); let _ = entry_mut.page.get().contents.take(); } self.unlink(entry); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2d7d49210..4217c1ddc 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1014,7 +1014,10 @@ impl Pager { let page = { let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page = cache.get(&page_key).expect(&format!( + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={}", + page_id + )); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); trace!( "commit_dirty_pages(page={}, page_type={:?}", @@ -1528,6 +1531,9 @@ impl Pager { tracing::debug!(schema_did_change); self.dirty_pages.borrow_mut().clear(); let mut cache = self.page_cache.write(); + + self.reset_internal_states(); + cache.unset_dirty_all_pages(); cache.clear().expect("failed to clear page cache"); if schema_did_change { @@ -1537,6 +1543,17 @@ impl Pager { Ok(()) } + + fn reset_internal_states(&self) { + self.checkpoint_state.replace(CheckpointState::Checkpoint); + self.checkpoint_inflight.replace(0); + self.syncing.replace(false); + self.flush_info.replace(FlushInfo { + state: FlushState::Start, + in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), + }); + } } pub fn allocate_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 0c29ac161..05945d008 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1038,6 +1038,7 @@ impl Wal for WalFile { } self.last_checksum = shared.last_checksum; } + self.reset_internal_states(); Ok(()) } @@ -1128,6 +1129,12 @@ impl WalFile { } } } + + fn reset_internal_states(&self) { + self.sync_state.set(SyncState::NotSyncing); + self.syncing.set(false); + self.ongoing_checkpoint.state = CheckpointState::Start; + } } impl WalFileShared { From c397588ad6872392cdfbeac886644777e48693f2 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 17 Jul 2025 15:52:42 +0200 Subject: [PATCH 03/10] change connection state after finding error on I/O --- core/lib.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/lib.rs b/core/lib.rs index 311868578..fff2001f7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1217,7 +1217,19 @@ impl Statement { let state = self.program.connection.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { self.pager - .rollback(schema_did_change, &self.program.connection)? + .rollback(schema_did_change, &self.program.connection)?; + let end_tx_res = + self.pager + .end_tx(true, schema_did_change, &self.program.connection, true)?; + self.program + .connection + .transaction_state + .set(TransactionState::None); + assert!( + matches!(end_tx_res, PagerCacheflushStatus::Done(_)), + "end_tx should not return IO as it should just end txn without flushing anything. Got {:?}", + end_tx_res + ); } } res From 14de7c55afacfa7e2e8b04c3658722b2e403ed20 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 17 Jul 2025 15:57:49 +0200 Subject: [PATCH 04/10] set connection state to None in vdbe rollback --- core/lib.rs | 11 ++++++++--- core/storage/pager.rs | 7 ++++++- core/storage/wal.rs | 7 +++++-- core/vdbe/mod.rs | 5 ++++- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index fff2001f7..4ea0234b7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1216,8 +1216,13 @@ impl Statement { if res.is_err() { let state = self.program.connection.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { - self.pager - .rollback(schema_did_change, &self.program.connection)?; + if let Err(e) = self + .pager + .rollback(schema_did_change, &self.program.connection) + { + // Let's panic for now as we don't want to leave state in a bad state. + panic!("rollback failed: {:?}", e); + } let end_tx_res = self.pager .end_tx(true, schema_did_change, &self.program.connection, true)?; @@ -1226,7 +1231,7 @@ impl Statement { .transaction_state .set(TransactionState::None); assert!( - matches!(end_tx_res, PagerCacheflushStatus::Done(_)), + matches!(end_tx_res, IOResult::Done(_)), "end_tx should not return IO as it should just end txn without flushing anything. Got {:?}", end_tx_res ); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 4217c1ddc..21ce0f822 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1549,7 +1549,12 @@ impl Pager { self.checkpoint_inflight.replace(0); self.syncing.replace(false); self.flush_info.replace(FlushInfo { - state: FlushState::Start, + state: CacheFlushState::Start, + in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), + }); + self.commit_info.replace(CommitInfo { + state: CommitState::Start, in_flight_writes: Rc::new(RefCell::new(0)), dirty_pages: Vec::new(), }); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 05945d008..a4666cae7 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1130,10 +1130,13 @@ impl WalFile { } } - fn reset_internal_states(&self) { + fn reset_internal_states(&mut self) { + self.ongoing_checkpoint.state = CheckpointState::Start; + self.ongoing_checkpoint.min_frame = 0; + self.ongoing_checkpoint.max_frame = 0; + self.ongoing_checkpoint.current_page = 0; self.sync_state.set(SyncState::NotSyncing); self.syncing.set(false); - self.ongoing_checkpoint.state = CheckpointState::Start; } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 48e1eb43f..c64f26688 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -418,7 +418,10 @@ impl Program { _ => { let state = self.connection.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { - pager.rollback(schema_did_change, &self.connection)? + pager.rollback(schema_did_change, &self.connection)?; + self.connection + .transaction_state + .replace(TransactionState::None); } } } From 5a1773edf17c92bbaeafe85b6aa9da64d4058a33 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 17 Jul 2025 17:46:10 +0200 Subject: [PATCH 05/10] clippy --- core/storage/pager.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 21ce0f822..bbceee0ca 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -985,7 +985,7 @@ impl Pager { ) -> Result> { let mut checkpoint_result = CheckpointResult::default(); let res = loop { - let state = self.commit_info.borrow().state.clone(); + let state = self.commit_info.borrow().state; trace!(?state); match state { CommitState::Start => { @@ -1014,10 +1014,12 @@ impl Pager { let page = { let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).expect(&format!( - "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={}", - page_id - )); + let page = cache.get(&page_key).unwrap_or_else(|| { + panic!( + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={}", + page_id + ) + }); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); trace!( "commit_dirty_pages(page={}, page_type={:?}", From d77c899fa633af479c6a84af2c0530ac2808cb80 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 18 Jul 2025 13:53:17 +0200 Subject: [PATCH 06/10] clippy --- core/lib.rs | 5 ++--- core/storage/pager.rs | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 4ea0234b7..2dc6d6bf7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1221,7 +1221,7 @@ impl Statement { .rollback(schema_did_change, &self.program.connection) { // Let's panic for now as we don't want to leave state in a bad state. - panic!("rollback failed: {:?}", e); + panic!("rollback failed: {e:?}"); } let end_tx_res = self.pager @@ -1232,8 +1232,7 @@ impl Statement { .set(TransactionState::None); assert!( matches!(end_tx_res, IOResult::Done(_)), - "end_tx should not return IO as it should just end txn without flushing anything. Got {:?}", - end_tx_res + "end_tx should not return IO as it should just end txn without flushing anything. Got {end_tx_res:?}" ); } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bbceee0ca..c33004b86 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1016,8 +1016,7 @@ impl Pager { let page_key = PageCacheKey::new(page_id); let page = cache.get(&page_key).unwrap_or_else(|| { panic!( - "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={}", - page_id + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" ) }); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); From 674d88e1407a334c8169afb61abcf59d4d18ce6b Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 21 Jul 2025 15:42:01 +0200 Subject: [PATCH 07/10] do not clear dirty pages on cacheflush::start --- core/storage/pager.rs | 53 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index c33004b86..ca5392a58 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -905,7 +905,6 @@ impl Pager { trace!(?state); match state { CacheFlushState::Start => { - self.dirty_pages.borrow_mut().clear(); let dirty_pages = self .dirty_pages .borrow() @@ -945,7 +944,6 @@ impl Pager { 0, self.flush_info.borrow().in_flight_writes.clone(), )?; - page.clear_dirty(); self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrame { current_page_to_append_idx, }; @@ -956,8 +954,25 @@ impl Pager { } => { let in_flight = self.flush_info.borrow().in_flight_writes.clone(); if *in_flight.borrow() == 0 { - if current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1 - { + // Clear dirty now + let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); + // Continue with next page + let is_last_page = current_page_to_append_idx + == self.flush_info.borrow().dirty_pages.len() - 1; + if is_last_page { self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = CacheFlushState::Start; Ok(IOResult::Done(())) @@ -1041,7 +1056,6 @@ impl Pager { db_size, self.commit_info.borrow().in_flight_writes.clone(), )?; - page.clear_dirty(); self.commit_info.borrow_mut().state = CommitState::WaitAppendFrame { current_page_to_append_idx, }; @@ -1051,9 +1065,32 @@ impl Pager { } => { let in_flight = self.commit_info.borrow().in_flight_writes.clone(); if *in_flight.borrow() == 0 { - if current_page_to_append_idx - == self.commit_info.borrow().dirty_pages.len() - 1 - { + // First clear dirty + let page_id = + self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).unwrap_or_else(|| { + panic!( + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" + ) + }); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); + + // Now advance to next page if there are more + let is_last_frame = current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1; + if is_last_frame { + // Let's clear the page cache now { let mut cache = self.page_cache.write(); cache.clear().unwrap(); From 75f9c23ed3623c1207b9dad9eb3e2911c69269fd Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 22 Jul 2025 18:31:04 +0200 Subject: [PATCH 08/10] end txn on vdbe failures --- core/vdbe/mod.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index c64f26688..25c05dab3 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -418,7 +418,22 @@ impl Program { _ => { let state = self.connection.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { - pager.rollback(schema_did_change, &self.connection)?; + if let Err(e) = pager.rollback(schema_did_change, &self.connection) + { + tracing::error!("rollback failed: {e}"); + } + if let Err(e) = + pager.end_tx(false, schema_did_change, &self.connection, false) + { + tracing::error!("end_tx failed: {e}"); + } + self.connection + .transaction_state + .replace(TransactionState::None); + } else { + if let Err(e) = pager.end_read_tx() { + tracing::error!("end_read_tx failed: {e}"); + } self.connection .transaction_state .replace(TransactionState::None); From b07e57d9d17ba13625d1b9d0a5239347c8764edb Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 23 Jul 2025 11:59:54 +0200 Subject: [PATCH 09/10] review fixes --- core/lib.rs | 7 +-- core/storage/pager.rs | 122 +++++++++++++++++++++--------------------- core/vdbe/mod.rs | 59 ++++++++++---------- 3 files changed, 92 insertions(+), 96 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 2dc6d6bf7..0a661f6f8 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -805,11 +805,8 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let res = self._db.io.run_once(); - if res.is_err() { - let state = self.transaction_state.get(); - if let TransactionState::Write { schema_did_change } = state { - self.pager.borrow().rollback(schema_did_change, self)? - } + if let Err(ref e) = res { + vdbe::handle_program_error(&self.pager.borrow(), self, e)?; } res } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ca5392a58..286de874d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -953,36 +953,36 @@ impl Pager { current_page_to_append_idx, } => { let in_flight = self.flush_info.borrow().in_flight_writes.clone(); - if *in_flight.borrow() == 0 { - // Clear dirty now - let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - page.clear_dirty(); - // Continue with next page - let is_last_page = current_page_to_append_idx - == self.flush_info.borrow().dirty_pages.len() - 1; - if is_last_page { - self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = CacheFlushState::Start; - Ok(IOResult::Done(())) - } else { - self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { - current_page_to_append_idx: current_page_to_append_idx + 1, - }; - Ok(IOResult::IO) - } + if *in_flight.borrow() > 0 { + return Ok(IOResult::IO); + } + + // Clear dirty now + let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); + // Continue with next page + let is_last_page = + current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1; + if is_last_page { + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = CacheFlushState::Start; + Ok(IOResult::Done(())) } else { + self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + }; Ok(IOResult::IO) } } @@ -1064,46 +1064,44 @@ impl Pager { current_page_to_append_idx, } => { let in_flight = self.commit_info.borrow().in_flight_writes.clone(); - if *in_flight.borrow() == 0 { - // First clear dirty - let page_id = - self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).unwrap_or_else(|| { + if *in_flight.borrow() > 0 { + return Ok(IOResult::IO); + } + // First clear dirty + let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).unwrap_or_else(|| { panic!( "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" ) }); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - page.clear_dirty(); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); - // Now advance to next page if there are more - let is_last_frame = current_page_to_append_idx - == self.commit_info.borrow().dirty_pages.len() - 1; - if is_last_frame { - // Let's clear the page cache now - { - let mut cache = self.page_cache.write(); - cache.clear().unwrap(); - } - self.dirty_pages.borrow_mut().clear(); - self.commit_info.borrow_mut().state = CommitState::SyncWal; - } else { - self.commit_info.borrow_mut().state = CommitState::AppendFrame { - current_page_to_append_idx: current_page_to_append_idx + 1, - } + // Now advance to next page if there are more + let is_last_frame = current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1; + if is_last_frame { + // Let's clear the page cache now + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); } + self.dirty_pages.borrow_mut().clear(); + self.commit_info.borrow_mut().state = CommitState::SyncWal; } else { - return Ok(IOResult::IO); + self.commit_info.borrow_mut().state = CommitState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + } } } CommitState::SyncWal => { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 25c05dab3..6a337d934 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -413,35 +413,8 @@ impl Program { Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt), Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy), Err(err) => { - match err { - LimboError::TxError(_) => {} - _ => { - let state = self.connection.transaction_state.get(); - if let TransactionState::Write { schema_did_change } = state { - if let Err(e) = pager.rollback(schema_did_change, &self.connection) - { - tracing::error!("rollback failed: {e}"); - } - if let Err(e) = - pager.end_tx(false, schema_did_change, &self.connection, false) - { - tracing::error!("end_tx failed: {e}"); - } - self.connection - .transaction_state - .replace(TransactionState::None); - } else { - if let Err(e) = pager.end_read_tx() { - tracing::error!("end_read_tx failed: {e}"); - } - self.connection - .transaction_state - .replace(TransactionState::None); - } - } - } - let err = Err(err); - return err; + handle_program_error(&pager, &self.connection, &err)?; + return Err(err); } } } @@ -769,3 +742,31 @@ impl Row { self.count } } + +/// Handle a program error by rolling back the transaction +pub fn handle_program_error( + pager: &Rc, + connection: &Connection, + err: &LimboError, +) -> Result<()> { + match err { + LimboError::TxError(_) => {} + _ => { + let state = connection.transaction_state.get(); + if let TransactionState::Write { schema_did_change } = state { + if let Err(e) = pager.rollback(schema_did_change, connection) { + tracing::error!("rollback failed: {e}"); + } + if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) { + tracing::error!("end_tx failed: {e}"); + } + } else { + if let Err(e) = pager.end_read_tx() { + tracing::error!("end_read_tx failed: {e}"); + } + } + connection.transaction_state.replace(TransactionState::None); + } + } + Ok(()) +} From ce598b772e7c2d43cb4bd1fdb593ef932742a67f Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 23 Jul 2025 12:03:25 +0200 Subject: [PATCH 10/10] clippy i hate you so much --- core/vdbe/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 6a337d934..f7ab42cf2 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -760,10 +760,8 @@ pub fn handle_program_error( if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) { tracing::error!("end_tx failed: {e}"); } - } else { - if let Err(e) = pager.end_read_tx() { - tracing::error!("end_read_tx failed: {e}"); - } + } else if let Err(e) = pager.end_read_tx() { + tracing::error!("end_read_tx failed: {e}"); } connection.transaction_state.replace(TransactionState::None); }