diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7fe006240..a668e7bbc 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -9,12 +9,7 @@ use tracing::{instrument, Level}; use std::fmt::Formatter; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::{ - cell::{Cell, RefCell}, - fmt, - rc::Rc, - sync::Arc, -}; +use std::{cell::Cell, fmt, rc::Rc, sync::Arc}; use crate::fast_lock::SpinLock; use crate::io::{File, IO}; @@ -262,12 +257,7 @@ pub trait Wal { /// db_size > 0 -> last frame written in transaction /// db_size == 0 -> non-last frame written in transaction /// write_counter is the counter we use to track when the I/O operation starts and completes - fn append_frame( - &mut self, - page: PageRef, - db_size: u32, - write_counter: Rc>, - ) -> Result; + fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result; /// Complete append of frames by updating shared wal state. Before this /// all changes were stored locally. @@ -277,7 +267,6 @@ pub trait Wal { fn checkpoint( &mut self, pager: &Pager, - write_counter: Rc>, mode: CheckpointMode, ) -> Result>; fn sync(&mut self) -> Result>; @@ -1053,12 +1042,7 @@ impl Wal for WalFile { /// Write a frame to the WAL. #[instrument(skip_all, level = Level::DEBUG)] - fn append_frame( - &mut self, - page: PageRef, - db_size: u32, - write_counter: Rc>, - ) -> Result { + fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result { let shared = self.get_shared(); if shared.max_frame.load(Ordering::Acquire).eq(&0) { self.ensure_header_if_needed()?; @@ -1084,10 +1068,8 @@ impl Wal for WalFile { page_buf, ); - *write_counter.borrow_mut() += 1; let c = Completion::new_write({ let frame_bytes = frame_bytes.clone(); - let write_counter = write_counter.clone(); move |bytes_written| { let frame_len = frame_bytes.len(); turso_assert!( @@ -1096,15 +1078,10 @@ impl Wal for WalFile { ); page.clear_dirty(); - *write_counter.borrow_mut() -= 1; } }); - let result = shared.file.pwrite(offset, frame_bytes.clone(), c); - if let Err(err) = result { - *write_counter.borrow_mut() -= 1; - return Err(err); - } - (result.unwrap(), frame_checksums) + let result = shared.file.pwrite(offset, frame_bytes.clone(), c)?; + (result, frame_checksums) }; self.complete_append_frame(page_id as u64, frame_id, checksums); Ok(c) @@ -1122,14 +1099,12 @@ impl Wal for WalFile { fn checkpoint( &mut self, pager: &Pager, - _write_counter: Rc>, mode: CheckpointMode, ) -> Result> { - self.checkpoint_inner(pager, _write_counter, mode) - .inspect_err(|_| { - let _ = self.checkpoint_guard.take(); - self.ongoing_checkpoint.state = CheckpointState::Start; - }) + self.checkpoint_inner(pager, mode).inspect_err(|_| { + let _ = self.checkpoint_guard.take(); + self.ongoing_checkpoint.state = CheckpointState::Start; + }) } #[instrument(err, skip_all, level = Level::DEBUG)] @@ -1371,7 +1346,6 @@ impl WalFile { fn checkpoint_inner( &mut self, pager: &Pager, - _write_counter: Rc>, mode: CheckpointMode, ) -> Result> { 'checkpoint_loop: loop { @@ -1913,7 +1887,7 @@ pub mod test { #[cfg(unix)] use std::os::unix::fs::MetadataExt; use std::{ - cell::{Cell, RefCell, UnsafeCell}, + cell::{Cell, UnsafeCell}, rc::Rc, sync::{atomic::Ordering, Arc}, }; @@ -2030,11 +2004,7 @@ pub mod test { pager: &crate::Pager, mode: CheckpointMode, ) -> CheckpointResult { - let wc = Rc::new(RefCell::new(0usize)); - pager - .io - .block(|| wal.checkpoint(pager, wc.clone(), mode)) - .unwrap() + pager.io.block(|| wal.checkpoint(pager, mode)).unwrap() } fn wal_header_snapshot(shared: &Arc>) -> (u32, u32, u32, u32) { @@ -2233,9 +2203,9 @@ pub mod test { let p = conn1.pager.borrow(); let mut w = p.wal.as_ref().unwrap().borrow_mut(); loop { - match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) { - Ok(IOResult::IO) => { - conn1.run_once().unwrap(); + match w.checkpoint(&p, CheckpointMode::Restart) { + Ok(IOResult::IO(io)) => { + io.wait(db.io.as_ref()).unwrap(); } e => { assert!( @@ -2262,9 +2232,9 @@ pub mod test { let p = conn1.pager.borrow(); let mut w = p.wal.as_ref().unwrap().borrow_mut(); loop { - match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) { - Ok(IOResult::IO) => { - conn1.run_once().unwrap(); + match w.checkpoint(&p, CheckpointMode::Restart) { + Ok(IOResult::IO(io)) => { + io.wait(db.io.as_ref()).unwrap(); } Ok(IOResult::Done(_)) => { panic!("Checkpoint should not have succeeded"); @@ -2430,7 +2400,7 @@ pub mod test { let result = { let pager = conn1.pager.borrow(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart) + wal.checkpoint(&pager, CheckpointMode::Restart) }; assert!( @@ -2793,7 +2763,7 @@ pub mod test { { let pager = conn1.pager.borrow(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - let result = wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart); + let result = wal.checkpoint(&pager, CheckpointMode::Restart); assert!( matches!(result, Err(LimboError::Busy)), @@ -2893,12 +2863,12 @@ pub mod test { { let pager = writer.pager.borrow(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - match wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Full) { - Ok(IOResult::IO) => { + match wal.checkpoint(&pager, CheckpointMode::Full) { + Ok(IOResult::IO(io)) => { // Drive any pending IO (should quickly become Busy or Done) - writer.run_once().unwrap(); + io.wait(db.io.as_ref()).unwrap(); // Call again to see final state - match wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Full) { + match wal.checkpoint(&pager, CheckpointMode::Full) { Err(LimboError::Busy) => {} other => panic!("expected Busy from FULL with old reader, got {other:?}"), }