Merge 'Append WAL frames one by one' from Pere Diaz Bou

Let's make sure we don't end up in a weird situation by appending frames
one by one and we can later think of optimizations.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2034
This commit is contained in:
Pere Diaz Bou
2025-07-24 16:44:51 +02:00
5 changed files with 254 additions and 66 deletions

View File

@@ -805,11 +805,8 @@ impl Connection {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let res = self._db.io.run_once();
if res.is_err() {
let state = self.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
self.pager.borrow().rollback(schema_did_change, self)?
}
if let Err(ref e) = res {
vdbe::handle_program_error(&self.pager.borrow(), self, e)?;
}
res
}
@@ -1216,8 +1213,24 @@ impl Statement {
if res.is_err() {
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
self.pager
.rollback(schema_did_change, &self.program.connection)?
if let Err(e) = self
.pager
.rollback(schema_did_change, &self.program.connection)
{
// Let's panic for now as we don't want to leave state in a bad state.
panic!("rollback failed: {e:?}");
}
let end_tx_res =
self.pager
.end_tx(true, schema_did_change, &self.program.connection, true)?;
self.program
.connection
.transaction_state
.set(TransactionState::None);
assert!(
matches!(end_tx_res, IOResult::Done(_)),
"end_tx should not return IO as it should just end txn without flushing anything. Got {end_tx_res:?}"
);
}
}
res

View File

@@ -204,7 +204,7 @@ impl DumbLruPageCache {
if clean_page {
entry_mut.page.clear_loaded();
debug!("cleaning up page {}", entry_mut.page.get().id);
debug!("clean(page={})", entry_mut.page.get().id);
let _ = entry_mut.page.get().contents.take();
}
self.unlink(entry);

View File

@@ -186,8 +186,10 @@ impl Page {
enum CacheFlushState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
WaitAppendFrames,
/// Append a single frame to the WAL.
AppendFrame { current_page_to_append_idx: usize },
/// Wait for append frame to complete.
WaitAppendFrame { current_page_to_append_idx: usize },
}
#[derive(Clone, Copy, Debug)]
@@ -195,8 +197,11 @@ enum CacheFlushState {
enum CommitState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
WaitAppendFrames,
/// Append a single frame to the WAL.
AppendFrame { current_page_to_append_idx: usize },
/// Wait for append frame to complete.
/// If the current page is the last page to append, sync wal and clear dirty pages and cache.
WaitAppendFrame { current_page_to_append_idx: usize },
/// Fsync the on-disk WAL.
SyncWal,
/// Checkpoint the WAL to the database file (if needed).
@@ -230,6 +235,8 @@ struct CommitInfo {
state: CommitState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
in_flight_writes: Rc<RefCell<usize>>,
/// Dirty pages to be flushed.
dirty_pages: Vec<usize>,
}
/// This will keep track of the state of current cache flush in order to not repeat work
@@ -237,6 +244,8 @@ struct FlushInfo {
state: CacheFlushState,
/// Number of writes taking place.
in_flight_writes: Rc<RefCell<usize>>,
/// Dirty pages to be flushed.
dirty_pages: Vec<usize>,
}
/// Track the state of the auto-vacuum mode.
@@ -394,6 +403,7 @@ impl Pager {
commit_info: RefCell::new(CommitInfo {
state: CommitState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
}),
syncing: Rc::new(RefCell::new(false)),
checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
@@ -408,6 +418,7 @@ impl Pager {
flush_info: RefCell::new(FlushInfo {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
}),
free_page_state: RefCell::new(FreePageState::Start),
})
@@ -899,31 +910,86 @@ impl Pager {
trace!(?state);
match state {
CacheFlushState::Start => {
for page_id in self.dirty_pages.borrow().iter() {
let dirty_pages = self
.dirty_pages
.borrow()
.iter()
.copied()
.collect::<Vec<usize>>();
let mut flush_info = self.flush_info.borrow_mut();
if dirty_pages.is_empty() {
Ok(IOResult::Done(()))
} else {
flush_info.dirty_pages = dirty_pages;
flush_info.state = CacheFlushState::AppendFrame {
current_page_to_append_idx: 0,
};
Ok(IOResult::IO)
}
}
CacheFlushState::AppendFrame {
current_page_to_append_idx,
} => {
let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(*page_id);
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?})", page_id, page_type);
self.wal.borrow_mut().append_frame(
page.clone(),
0,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrames;
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
self.wal.borrow_mut().append_frame(
page.clone(),
0,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrame {
current_page_to_append_idx,
};
return Ok(IOResult::IO);
}
CacheFlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = CacheFlushState::Start;
return Ok(IOResult::Done(()));
} else {
CacheFlushState::WaitAppendFrame {
current_page_to_append_idx,
} => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() > 0 {
return Ok(IOResult::IO);
}
// Clear dirty now
let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Continue with next page
let is_last_page =
current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1;
if is_last_page {
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::Start;
Ok(IOResult::Done(()))
} else {
self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
};
Ok(IOResult::IO)
}
}
}
}
@@ -943,41 +1009,104 @@ impl Pager {
trace!(?state);
match state {
CommitState::Start => {
let db_size = header_accessor::get_database_size(self)?;
for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() {
let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1;
let dirty_pages = self
.dirty_pages
.borrow()
.iter()
.copied()
.collect::<Vec<usize>>();
let mut commit_info = self.commit_info.borrow_mut();
if dirty_pages.is_empty() {
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
} else {
commit_info.dirty_pages = dirty_pages;
commit_info.state = CommitState::AppendFrame {
current_page_to_append_idx: 0,
};
}
}
CommitState::AppendFrame {
current_page_to_append_idx,
} => {
let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx];
let is_last_frame = current_page_to_append_idx
== self.commit_info.borrow().dirty_pages.len() - 1;
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(*page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
panic!(
"we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}"
)
});
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
let db_size = if is_last_frame { db_size } else { 0 };
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.commit_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
// This is okay assuming we use shared cache by default.
{
let mut cache = self.page_cache.write();
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.commit_info.borrow_mut().state = CommitState::WaitAppendFrames;
return Ok(IOResult::IO);
page
};
let db_size = {
let db_size = header_accessor::get_database_size(self)?;
if is_last_frame {
db_size
} else {
0
}
};
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.commit_info.borrow().in_flight_writes.clone(),
)?;
self.commit_info.borrow_mut().state = CommitState::WaitAppendFrame {
current_page_to_append_idx,
};
}
CommitState::WaitAppendFrames => {
let in_flight = *self.commit_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
CommitState::WaitAppendFrame {
current_page_to_append_idx,
} => {
let in_flight = self.commit_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() > 0 {
return Ok(IOResult::IO);
}
// First clear dirty
let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
panic!(
"we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}"
)
});
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Now advance to next page if there are more
let is_last_frame = current_page_to_append_idx
== self.commit_info.borrow().dirty_pages.len() - 1;
if is_last_frame {
// Let's clear the page cache now
{
let mut cache = self.page_cache.write();
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.commit_info.borrow_mut().state = CommitState::SyncWal;
} else {
return Ok(IOResult::IO);
self.commit_info.borrow_mut().state = CommitState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
}
}
}
CommitState::SyncWal => {
@@ -1443,6 +1572,9 @@ impl Pager {
tracing::debug!(schema_did_change);
self.dirty_pages.borrow_mut().clear();
let mut cache = self.page_cache.write();
self.reset_internal_states();
cache.unset_dirty_all_pages();
cache.clear().expect("failed to clear page cache");
if schema_did_change {
@@ -1452,6 +1584,22 @@ impl Pager {
Ok(())
}
fn reset_internal_states(&self) {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
self.checkpoint_inflight.replace(0);
self.syncing.replace(false);
self.flush_info.replace(FlushInfo {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
});
self.commit_info.replace(CommitInfo {
state: CommitState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
});
}
}
pub fn allocate_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {

View File

@@ -1040,6 +1040,7 @@ impl Wal for WalFile {
}
self.last_checksum = shared.last_checksum;
}
self.reset_internal_states();
Ok(())
}
@@ -1130,6 +1131,15 @@ impl WalFile {
}
}
}
fn reset_internal_states(&mut self) {
self.ongoing_checkpoint.state = CheckpointState::Start;
self.ongoing_checkpoint.min_frame = 0;
self.ongoing_checkpoint.max_frame = 0;
self.ongoing_checkpoint.current_page = 0;
self.sync_state.set(SyncState::NotSyncing);
self.syncing.set(false);
}
}
impl WalFileShared {

View File

@@ -413,17 +413,8 @@ impl Program {
Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt),
Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy),
Err(err) => {
match err {
LimboError::TxError(_) => {}
_ => {
let state = self.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
pager.rollback(schema_did_change, &self.connection)?
}
}
}
let err = Err(err);
return err;
handle_program_error(&pager, &self.connection, &err)?;
return Err(err);
}
}
}
@@ -751,3 +742,29 @@ impl Row {
self.count
}
}
/// Handle a program error by rolling back the transaction
pub fn handle_program_error(
pager: &Rc<Pager>,
connection: &Connection,
err: &LimboError,
) -> Result<()> {
match err {
LimboError::TxError(_) => {}
_ => {
let state = connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
if let Err(e) = pager.rollback(schema_did_change, connection) {
tracing::error!("rollback failed: {e}");
}
if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) {
tracing::error!("end_tx failed: {e}");
}
} else if let Err(e) = pager.end_read_tx() {
tracing::error!("end_read_tx failed: {e}");
}
connection.transaction_state.replace(TransactionState::None);
}
}
Ok(())
}