mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 18:24:20 +01:00
remove write counters from Wal impls
This commit is contained in:
@@ -9,12 +9,7 @@ use tracing::{instrument, Level};
|
||||
|
||||
use std::fmt::Formatter;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::{
|
||||
cell::{Cell, RefCell},
|
||||
fmt,
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{cell::Cell, fmt, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::fast_lock::SpinLock;
|
||||
use crate::io::{File, IO};
|
||||
@@ -262,12 +257,7 @@ pub trait Wal {
|
||||
/// db_size > 0 -> last frame written in transaction
|
||||
/// db_size == 0 -> non-last frame written in transaction
|
||||
/// write_counter is the counter we use to track when the I/O operation starts and completes
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
db_size: u32,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<Completion>;
|
||||
fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result<Completion>;
|
||||
|
||||
/// Complete append of frames by updating shared wal state. Before this
|
||||
/// all changes were stored locally.
|
||||
@@ -277,7 +267,6 @@ pub trait Wal {
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<IOResult<CheckpointResult>>;
|
||||
fn sync(&mut self) -> Result<IOResult<()>>;
|
||||
@@ -1053,12 +1042,7 @@ impl Wal for WalFile {
|
||||
|
||||
/// Write a frame to the WAL.
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn append_frame(
|
||||
&mut self,
|
||||
page: PageRef,
|
||||
db_size: u32,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<Completion> {
|
||||
fn append_frame(&mut self, page: PageRef, db_size: u32) -> Result<Completion> {
|
||||
let shared = self.get_shared();
|
||||
if shared.max_frame.load(Ordering::Acquire).eq(&0) {
|
||||
self.ensure_header_if_needed()?;
|
||||
@@ -1084,10 +1068,8 @@ impl Wal for WalFile {
|
||||
page_buf,
|
||||
);
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
let c = Completion::new_write({
|
||||
let frame_bytes = frame_bytes.clone();
|
||||
let write_counter = write_counter.clone();
|
||||
move |bytes_written| {
|
||||
let frame_len = frame_bytes.len();
|
||||
turso_assert!(
|
||||
@@ -1096,15 +1078,10 @@ impl Wal for WalFile {
|
||||
);
|
||||
|
||||
page.clear_dirty();
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
}
|
||||
});
|
||||
let result = shared.file.pwrite(offset, frame_bytes.clone(), c);
|
||||
if let Err(err) = result {
|
||||
*write_counter.borrow_mut() -= 1;
|
||||
return Err(err);
|
||||
}
|
||||
(result.unwrap(), frame_checksums)
|
||||
let result = shared.file.pwrite(offset, frame_bytes.clone(), c)?;
|
||||
(result, frame_checksums)
|
||||
};
|
||||
self.complete_append_frame(page_id as u64, frame_id, checksums);
|
||||
Ok(c)
|
||||
@@ -1122,14 +1099,12 @@ impl Wal for WalFile {
|
||||
fn checkpoint(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<IOResult<CheckpointResult>> {
|
||||
self.checkpoint_inner(pager, _write_counter, mode)
|
||||
.inspect_err(|_| {
|
||||
let _ = self.checkpoint_guard.take();
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
})
|
||||
self.checkpoint_inner(pager, mode).inspect_err(|_| {
|
||||
let _ = self.checkpoint_guard.take();
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::DEBUG)]
|
||||
@@ -1371,7 +1346,6 @@ impl WalFile {
|
||||
fn checkpoint_inner(
|
||||
&mut self,
|
||||
pager: &Pager,
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<IOResult<CheckpointResult>> {
|
||||
'checkpoint_loop: loop {
|
||||
@@ -1913,7 +1887,7 @@ pub mod test {
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::{
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
cell::{Cell, UnsafeCell},
|
||||
rc::Rc,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
};
|
||||
@@ -2030,11 +2004,7 @@ pub mod test {
|
||||
pager: &crate::Pager,
|
||||
mode: CheckpointMode,
|
||||
) -> CheckpointResult {
|
||||
let wc = Rc::new(RefCell::new(0usize));
|
||||
pager
|
||||
.io
|
||||
.block(|| wal.checkpoint(pager, wc.clone(), mode))
|
||||
.unwrap()
|
||||
pager.io.block(|| wal.checkpoint(pager, mode)).unwrap()
|
||||
}
|
||||
|
||||
fn wal_header_snapshot(shared: &Arc<UnsafeCell<WalFileShared>>) -> (u32, u32, u32, u32) {
|
||||
@@ -2233,9 +2203,9 @@ pub mod test {
|
||||
let p = conn1.pager.borrow();
|
||||
let mut w = p.wal.as_ref().unwrap().borrow_mut();
|
||||
loop {
|
||||
match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) {
|
||||
Ok(IOResult::IO) => {
|
||||
conn1.run_once().unwrap();
|
||||
match w.checkpoint(&p, CheckpointMode::Restart) {
|
||||
Ok(IOResult::IO(io)) => {
|
||||
io.wait(db.io.as_ref()).unwrap();
|
||||
}
|
||||
e => {
|
||||
assert!(
|
||||
@@ -2262,9 +2232,9 @@ pub mod test {
|
||||
let p = conn1.pager.borrow();
|
||||
let mut w = p.wal.as_ref().unwrap().borrow_mut();
|
||||
loop {
|
||||
match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) {
|
||||
Ok(IOResult::IO) => {
|
||||
conn1.run_once().unwrap();
|
||||
match w.checkpoint(&p, CheckpointMode::Restart) {
|
||||
Ok(IOResult::IO(io)) => {
|
||||
io.wait(db.io.as_ref()).unwrap();
|
||||
}
|
||||
Ok(IOResult::Done(_)) => {
|
||||
panic!("Checkpoint should not have succeeded");
|
||||
@@ -2430,7 +2400,7 @@ pub mod test {
|
||||
let result = {
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart)
|
||||
wal.checkpoint(&pager, CheckpointMode::Restart)
|
||||
};
|
||||
|
||||
assert!(
|
||||
@@ -2793,7 +2763,7 @@ pub mod test {
|
||||
{
|
||||
let pager = conn1.pager.borrow();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let result = wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart);
|
||||
let result = wal.checkpoint(&pager, CheckpointMode::Restart);
|
||||
|
||||
assert!(
|
||||
matches!(result, Err(LimboError::Busy)),
|
||||
@@ -2893,12 +2863,12 @@ pub mod test {
|
||||
{
|
||||
let pager = writer.pager.borrow();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
match wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Full) {
|
||||
Ok(IOResult::IO) => {
|
||||
match wal.checkpoint(&pager, CheckpointMode::Full) {
|
||||
Ok(IOResult::IO(io)) => {
|
||||
// Drive any pending IO (should quickly become Busy or Done)
|
||||
writer.run_once().unwrap();
|
||||
io.wait(db.io.as_ref()).unwrap();
|
||||
// Call again to see final state
|
||||
match wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Full) {
|
||||
match wal.checkpoint(&pager, CheckpointMode::Full) {
|
||||
Err(LimboError::Busy) => {}
|
||||
other => panic!("expected Busy from FULL with old reader, got {other:?}"),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user