diff --git a/core/lib.rs b/core/lib.rs index 311868578..0a661f6f8 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -805,11 +805,8 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let res = self._db.io.run_once(); - if res.is_err() { - let state = self.transaction_state.get(); - if let TransactionState::Write { schema_did_change } = state { - self.pager.borrow().rollback(schema_did_change, self)? - } + if let Err(ref e) = res { + vdbe::handle_program_error(&self.pager.borrow(), self, e)?; } res } @@ -1216,8 +1213,24 @@ impl Statement { if res.is_err() { let state = self.program.connection.transaction_state.get(); if let TransactionState::Write { schema_did_change } = state { - self.pager - .rollback(schema_did_change, &self.program.connection)? + if let Err(e) = self + .pager + .rollback(schema_did_change, &self.program.connection) + { + // Let's panic for now as we don't want to leave state in a bad state. + panic!("rollback failed: {e:?}"); + } + let end_tx_res = + self.pager + .end_tx(true, schema_did_change, &self.program.connection, true)?; + self.program + .connection + .transaction_state + .set(TransactionState::None); + assert!( + matches!(end_tx_res, IOResult::Done(_)), + "end_tx should not return IO as it should just end txn without flushing anything. Got {end_tx_res:?}" + ); } } res diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 55a4fb249..a892a32fc 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -204,7 +204,7 @@ impl DumbLruPageCache { if clean_page { entry_mut.page.clear_loaded(); - debug!("cleaning up page {}", entry_mut.page.get().id); + debug!("clean(page={})", entry_mut.page.get().id); let _ = entry_mut.page.get().contents.take(); } self.unlink(entry); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 7792bae0e..963fd2942 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -186,8 +186,10 @@ impl Page { enum CacheFlushState { /// Idle. Start, - /// Waiting for all in-flight writes to the on-disk WAL to complete. - WaitAppendFrames, + /// Append a single frame to the WAL. + AppendFrame { current_page_to_append_idx: usize }, + /// Wait for append frame to complete. + WaitAppendFrame { current_page_to_append_idx: usize }, } #[derive(Clone, Copy, Debug)] @@ -195,8 +197,11 @@ enum CacheFlushState { enum CommitState { /// Idle. Start, - /// Waiting for all in-flight writes to the on-disk WAL to complete. - WaitAppendFrames, + /// Append a single frame to the WAL. + AppendFrame { current_page_to_append_idx: usize }, + /// Wait for append frame to complete. + /// If the current page is the last page to append, sync wal and clear dirty pages and cache. + WaitAppendFrame { current_page_to_append_idx: usize }, /// Fsync the on-disk WAL. SyncWal, /// Checkpoint the WAL to the database file (if needed). @@ -230,6 +235,8 @@ struct CommitInfo { state: CommitState, /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync. in_flight_writes: Rc>, + /// Dirty pages to be flushed. + dirty_pages: Vec, } /// This will keep track of the state of current cache flush in order to not repeat work @@ -237,6 +244,8 @@ struct FlushInfo { state: CacheFlushState, /// Number of writes taking place. in_flight_writes: Rc>, + /// Dirty pages to be flushed. + dirty_pages: Vec, } /// Track the state of the auto-vacuum mode. @@ -394,6 +403,7 @@ impl Pager { commit_info: RefCell::new(CommitInfo { state: CommitState::Start, in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), }), syncing: Rc::new(RefCell::new(false)), checkpoint_state: RefCell::new(CheckpointState::Checkpoint), @@ -408,6 +418,7 @@ impl Pager { flush_info: RefCell::new(FlushInfo { state: CacheFlushState::Start, in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), }), free_page_state: RefCell::new(FreePageState::Start), }) @@ -899,31 +910,86 @@ impl Pager { trace!(?state); match state { CacheFlushState::Start => { - for page_id in self.dirty_pages.borrow().iter() { + let dirty_pages = self + .dirty_pages + .borrow() + .iter() + .copied() + .collect::>(); + let mut flush_info = self.flush_info.borrow_mut(); + if dirty_pages.is_empty() { + Ok(IOResult::Done(())) + } else { + flush_info.dirty_pages = dirty_pages; + flush_info.state = CacheFlushState::AppendFrame { + current_page_to_append_idx: 0, + }; + Ok(IOResult::IO) + } + } + CacheFlushState::AppendFrame { + current_page_to_append_idx, + } => { + let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(*page_id); + let page_key = PageCacheKey::new(page_id); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!("cacheflush(page={}, page_type={:?})", page_id, page_type); - self.wal.borrow_mut().append_frame( - page.clone(), - 0, - self.flush_info.borrow().in_flight_writes.clone(), - )?; - page.clear_dirty(); - } - self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrames; + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + + self.wal.borrow_mut().append_frame( + page.clone(), + 0, + self.flush_info.borrow().in_flight_writes.clone(), + )?; + self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrame { + current_page_to_append_idx, + }; return Ok(IOResult::IO); } - CacheFlushState::WaitAppendFrames => { - let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); - if in_flight == 0 { - self.flush_info.borrow_mut().state = CacheFlushState::Start; - return Ok(IOResult::Done(())); - } else { + CacheFlushState::WaitAppendFrame { + current_page_to_append_idx, + } => { + let in_flight = self.flush_info.borrow().in_flight_writes.clone(); + if *in_flight.borrow() > 0 { return Ok(IOResult::IO); } + + // Clear dirty now + let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); + // Continue with next page + let is_last_page = + current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1; + if is_last_page { + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = CacheFlushState::Start; + Ok(IOResult::Done(())) + } else { + self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + }; + Ok(IOResult::IO) + } } } } @@ -943,41 +1009,104 @@ impl Pager { trace!(?state); match state { CommitState::Start => { - let db_size = header_accessor::get_database_size(self)?; - for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() { - let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1; + let dirty_pages = self + .dirty_pages + .borrow() + .iter() + .copied() + .collect::>(); + let mut commit_info = self.commit_info.borrow_mut(); + if dirty_pages.is_empty() { + return Ok(IOResult::Done(PagerCommitResult::WalWritten)); + } else { + commit_info.dirty_pages = dirty_pages; + commit_info.state = CommitState::AppendFrame { + current_page_to_append_idx: 0, + }; + } + } + CommitState::AppendFrame { + current_page_to_append_idx, + } => { + let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; + let is_last_frame = current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1; + let page = { let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(*page_id); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).unwrap_or_else(|| { + panic!( + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" + ) + }); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); trace!( "commit_dirty_pages(page={}, page_type={:?}", page_id, page_type ); - let db_size = if is_last_frame { db_size } else { 0 }; - self.wal.borrow_mut().append_frame( - page.clone(), - db_size, - self.commit_info.borrow().in_flight_writes.clone(), - )?; - page.clear_dirty(); - } - // This is okay assuming we use shared cache by default. - { - let mut cache = self.page_cache.write(); - cache.clear().unwrap(); - } - self.dirty_pages.borrow_mut().clear(); - self.commit_info.borrow_mut().state = CommitState::WaitAppendFrames; - return Ok(IOResult::IO); + page + }; + + let db_size = { + let db_size = header_accessor::get_database_size(self)?; + if is_last_frame { + db_size + } else { + 0 + } + }; + self.wal.borrow_mut().append_frame( + page.clone(), + db_size, + self.commit_info.borrow().in_flight_writes.clone(), + )?; + self.commit_info.borrow_mut().state = CommitState::WaitAppendFrame { + current_page_to_append_idx, + }; } - CommitState::WaitAppendFrames => { - let in_flight = *self.commit_info.borrow().in_flight_writes.borrow(); - if in_flight == 0 { + CommitState::WaitAppendFrame { + current_page_to_append_idx, + } => { + let in_flight = self.commit_info.borrow().in_flight_writes.clone(); + if *in_flight.borrow() > 0 { + return Ok(IOResult::IO); + } + // First clear dirty + let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).unwrap_or_else(|| { + panic!( + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" + ) + }); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); + + // Now advance to next page if there are more + let is_last_frame = current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1; + if is_last_frame { + // Let's clear the page cache now + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); + } + self.dirty_pages.borrow_mut().clear(); self.commit_info.borrow_mut().state = CommitState::SyncWal; } else { - return Ok(IOResult::IO); + self.commit_info.borrow_mut().state = CommitState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + } } } CommitState::SyncWal => { @@ -1443,6 +1572,9 @@ impl Pager { tracing::debug!(schema_did_change); self.dirty_pages.borrow_mut().clear(); let mut cache = self.page_cache.write(); + + self.reset_internal_states(); + cache.unset_dirty_all_pages(); cache.clear().expect("failed to clear page cache"); if schema_did_change { @@ -1452,6 +1584,22 @@ impl Pager { Ok(()) } + + fn reset_internal_states(&self) { + self.checkpoint_state.replace(CheckpointState::Checkpoint); + self.checkpoint_inflight.replace(0); + self.syncing.replace(false); + self.flush_info.replace(FlushInfo { + state: CacheFlushState::Start, + in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), + }); + self.commit_info.replace(CommitInfo { + state: CommitState::Start, + in_flight_writes: Rc::new(RefCell::new(0)), + dirty_pages: Vec::new(), + }); + } } pub fn allocate_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 9fb09be8d..599cd6826 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1040,6 +1040,7 @@ impl Wal for WalFile { } self.last_checksum = shared.last_checksum; } + self.reset_internal_states(); Ok(()) } @@ -1130,6 +1131,15 @@ impl WalFile { } } } + + fn reset_internal_states(&mut self) { + self.ongoing_checkpoint.state = CheckpointState::Start; + self.ongoing_checkpoint.min_frame = 0; + self.ongoing_checkpoint.max_frame = 0; + self.ongoing_checkpoint.current_page = 0; + self.sync_state.set(SyncState::NotSyncing); + self.syncing.set(false); + } } impl WalFileShared { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 48e1eb43f..f7ab42cf2 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -413,17 +413,8 @@ impl Program { Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt), Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy), Err(err) => { - match err { - LimboError::TxError(_) => {} - _ => { - let state = self.connection.transaction_state.get(); - if let TransactionState::Write { schema_did_change } = state { - pager.rollback(schema_did_change, &self.connection)? - } - } - } - let err = Err(err); - return err; + handle_program_error(&pager, &self.connection, &err)?; + return Err(err); } } } @@ -751,3 +742,29 @@ impl Row { self.count } } + +/// Handle a program error by rolling back the transaction +pub fn handle_program_error( + pager: &Rc, + connection: &Connection, + err: &LimboError, +) -> Result<()> { + match err { + LimboError::TxError(_) => {} + _ => { + let state = connection.transaction_state.get(); + if let TransactionState::Write { schema_did_change } = state { + if let Err(e) = pager.rollback(schema_did_change, connection) { + tracing::error!("rollback failed: {e}"); + } + if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) { + tracing::error!("end_tx failed: {e}"); + } + } else if let Err(e) = pager.end_read_tx() { + tracing::error!("end_read_tx failed: {e}"); + } + connection.transaction_state.replace(TransactionState::None); + } + } + Ok(()) +}