review fixes

This commit is contained in:
Pere Diaz Bou
2025-07-23 11:59:54 +02:00
parent 75f9c23ed3
commit b07e57d9d1
3 changed files with 92 additions and 96 deletions

View File

@@ -805,11 +805,8 @@ impl Connection {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let res = self._db.io.run_once();
if res.is_err() {
let state = self.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
self.pager.borrow().rollback(schema_did_change, self)?
}
if let Err(ref e) = res {
vdbe::handle_program_error(&self.pager.borrow(), self, e)?;
}
res
}

View File

@@ -953,36 +953,36 @@ impl Pager {
current_page_to_append_idx,
} => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() == 0 {
// Clear dirty now
let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Continue with next page
let is_last_page = current_page_to_append_idx
== self.flush_info.borrow().dirty_pages.len() - 1;
if is_last_page {
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::Start;
Ok(IOResult::Done(()))
} else {
self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
};
Ok(IOResult::IO)
}
if *in_flight.borrow() > 0 {
return Ok(IOResult::IO);
}
// Clear dirty now
let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Continue with next page
let is_last_page =
current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1;
if is_last_page {
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::Start;
Ok(IOResult::Done(()))
} else {
self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
};
Ok(IOResult::IO)
}
}
@@ -1064,46 +1064,44 @@ impl Pager {
current_page_to_append_idx,
} => {
let in_flight = self.commit_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() == 0 {
// First clear dirty
let page_id =
self.commit_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
if *in_flight.borrow() > 0 {
return Ok(IOResult::IO);
}
// First clear dirty
let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
panic!(
"we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}"
)
});
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Now advance to next page if there are more
let is_last_frame = current_page_to_append_idx
== self.commit_info.borrow().dirty_pages.len() - 1;
if is_last_frame {
// Let's clear the page cache now
{
let mut cache = self.page_cache.write();
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.commit_info.borrow_mut().state = CommitState::SyncWal;
} else {
self.commit_info.borrow_mut().state = CommitState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
}
// Now advance to next page if there are more
let is_last_frame = current_page_to_append_idx
== self.commit_info.borrow().dirty_pages.len() - 1;
if is_last_frame {
// Let's clear the page cache now
{
let mut cache = self.page_cache.write();
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.commit_info.borrow_mut().state = CommitState::SyncWal;
} else {
return Ok(IOResult::IO);
self.commit_info.borrow_mut().state = CommitState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
}
}
}
CommitState::SyncWal => {

View File

@@ -413,35 +413,8 @@ impl Program {
Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt),
Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy),
Err(err) => {
match err {
LimboError::TxError(_) => {}
_ => {
let state = self.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
if let Err(e) = pager.rollback(schema_did_change, &self.connection)
{
tracing::error!("rollback failed: {e}");
}
if let Err(e) =
pager.end_tx(false, schema_did_change, &self.connection, false)
{
tracing::error!("end_tx failed: {e}");
}
self.connection
.transaction_state
.replace(TransactionState::None);
} else {
if let Err(e) = pager.end_read_tx() {
tracing::error!("end_read_tx failed: {e}");
}
self.connection
.transaction_state
.replace(TransactionState::None);
}
}
}
let err = Err(err);
return err;
handle_program_error(&pager, &self.connection, &err)?;
return Err(err);
}
}
}
@@ -769,3 +742,31 @@ impl Row {
self.count
}
}
/// Handle a program error by rolling back the transaction
pub fn handle_program_error(
pager: &Rc<Pager>,
connection: &Connection,
err: &LimboError,
) -> Result<()> {
match err {
LimboError::TxError(_) => {}
_ => {
let state = connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
if let Err(e) = pager.rollback(schema_did_change, connection) {
tracing::error!("rollback failed: {e}");
}
if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) {
tracing::error!("end_tx failed: {e}");
}
} else {
if let Err(e) = pager.end_read_tx() {
tracing::error!("end_read_tx failed: {e}");
}
}
connection.transaction_state.replace(TransactionState::None);
}
}
Ok(())
}