Replace verbose completions with new helpers

This commit is contained in:
PThorpe92
2025-07-17 23:47:21 -04:00
parent 44d7570272
commit dced94aec6
3 changed files with 20 additions and 32 deletions

View File

@@ -7434,8 +7434,7 @@ mod tests {
header_accessor::get_page_size(&pager)? as usize,
drop_fn,
)));
let write_complete = Box::new(|_| {});
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
let c = Completion::new_write(|_| {});
#[allow(clippy::arc_with_non_send_sync)]
pager
.db_file

View File

@@ -49,9 +49,7 @@ use super::pager::PageRef;
use super::wal::LimboRwLock;
use crate::error::LimboError;
use crate::fast_lock::SpinLock;
use crate::io::{
Buffer, Complete, Completion, CompletionType, ReadCompletion, SyncCompletion, WriteCompletion,
};
use crate::io::{Buffer, Complete, Completion};
use crate::storage::btree::offset::{
BTREE_CELL_CONTENT_AREA, BTREE_CELL_COUNT, BTREE_FIRST_FREEBLOCK, BTREE_FRAGMENTED_BYTES_COUNT,
BTREE_PAGE_TYPE, BTREE_RIGHTMOST_PTR,
@@ -747,7 +745,7 @@ pub fn begin_read_page(
page.set_error();
}
});
let c = Completion::new(CompletionType::Read(ReadCompletion::new(buf, complete)));
let c = Completion::new_read(buf, complete);
db_file.read_page(page_idx, c)?;
Ok(())
}
@@ -808,7 +806,7 @@ pub fn begin_write_btree_page(
);
})
};
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
let c = Completion::new_write(write_complete);
let res = page_source.write_page(page_id, buffer.clone(), c);
if res.is_err() {
// Avoid infinite loop if write page fails
@@ -821,11 +819,9 @@ pub fn begin_write_btree_page(
pub fn begin_sync(db_file: Arc<dyn DatabaseStorage>, syncing: Rc<RefCell<bool>>) -> Result<()> {
assert!(!*syncing.borrow());
*syncing.borrow_mut() = true;
let completion = Completion::new(CompletionType::Sync(SyncCompletion {
complete: Box::new(move |_| {
*syncing.borrow_mut() = false;
}),
}));
let completion = Completion::new_sync(move |_| {
*syncing.borrow_mut() = false;
});
#[allow(clippy::arc_with_non_send_sync)]
db_file.sync(completion)?;
Ok(())
@@ -1486,10 +1482,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
wfs_data.last_checksum = cumulative_checksum;
wfs_data.loaded.store(true, Ordering::SeqCst);
});
let c = Completion::new(CompletionType::Read(ReadCompletion::new(
buf_for_pread,
complete,
)));
let c = Completion::new_read(buf_for_pread, complete);
file.pread(0, c)?;
Ok(wal_file_shared_ret)
@@ -1509,7 +1502,7 @@ pub fn begin_read_wal_frame(
});
let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn)));
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Read(ReadCompletion::new(buf, complete)));
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
Ok(c)
}
@@ -1603,7 +1596,7 @@ pub fn begin_write_wal_frame(
})
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
let c = Completion::new_write(write_complete);
let res = io.pwrite(offset, buffer.clone(), c);
if res.is_err() {
// If we do not reduce the counter here on error, we incur an infinite loop when cacheflushing
@@ -1635,16 +1628,14 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
Arc::new(RefCell::new(buffer))
};
let write_complete = {
Box::new(move |bytes_written: i32| {
turso_assert!(
bytes_written == WAL_HEADER_SIZE as i32,
"wal header wrote({bytes_written}) != expected({WAL_HEADER_SIZE})"
);
})
let write_complete = move |bytes_written: i32| {
turso_assert!(
bytes_written == WAL_HEADER_SIZE as i32,
"wal header wrote({bytes_written}) != expected({WAL_HEADER_SIZE})"
);
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
let c = Completion::new_write(write_complete);
io.pwrite(0, buffer.clone(), c)?;
Ok(())
}

View File

@@ -883,12 +883,10 @@ impl Wal for WalFile {
tracing::debug!("wal_sync");
let syncing = self.syncing.clone();
self.syncing.set(true);
let completion = Completion::new(CompletionType::Sync(SyncCompletion {
complete: Box::new(move |_| {
tracing::debug!("wal_sync finish");
syncing.set(false);
}),
}));
let completion = Completion::new_sync(move |_| {
tracing::debug!("wal_sync finish");
syncing.set(false);
});
let shared = self.get_shared();
shared.file.sync(completion)?;
self.sync_state.set(SyncState::Syncing);