abort completions on error

This commit is contained in:
pedrocarlo
2025-08-14 21:07:24 -03:00
parent 4dca1c00db
commit de1811dea7
6 changed files with 47 additions and 19 deletions

View File

@@ -79,7 +79,7 @@ pub enum LimboError {
PlanningError(String),
}
// We only propagate the error kind
// We only propagate the error kind so we can avoid string allocation in hot path and copying/cloning enums is cheaper
impl From<std::io::Error> for LimboError {
fn from(value: std::io::Error) -> Self {
Self::CompletionError(CompletionError::IOError(value.kind()))

View File

@@ -50,8 +50,10 @@ pub trait File: Send + Sync {
})
};
if let Err(e) = self.pwrite(pos, buf.clone(), child_c) {
// best-effort: mark as done so caller won't wait forever
c.complete(-1);
// best-effort: mark as abort so caller won't wait forever
// TODO: when we have `pwrite` and other I/O methods return CompletionError
// instead of LimboError, store the error inside
c.abort();
return Err(e);
}
pos += len;

View File

@@ -2547,9 +2547,14 @@ impl BTreeCursor {
// start loading right page first
let mut pgno: u32 = unsafe { right_pointer.cast::<u32>().read().swap_bytes() };
let current_sibling = sibling_pointer;
let mut completions = Vec::with_capacity(current_sibling + 1);
let mut completions: Vec<Completion> = Vec::with_capacity(current_sibling + 1);
for i in (0..=current_sibling).rev() {
let (page, c) = btree_read_page(&self.pager, pgno as usize)?;
let (page, c) =
btree_read_page(&self.pager, pgno as usize).inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
{
// mark as dirty
let sibling_page = page.get();

View File

@@ -1124,7 +1124,7 @@ impl Pager {
.iter()
.copied()
.collect::<Vec<usize>>();
let mut completions = Vec::with_capacity(dirty_pages.len());
let mut completions: Vec<Completion> = Vec::with_capacity(dirty_pages.len());
for page_id in dirty_pages {
let page = {
let mut cache = self.page_cache.write();
@@ -1138,11 +1138,18 @@ impl Pager {
);
page
};
let c = wal.borrow_mut().append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
0,
)?;
let c = wal
.borrow_mut()
.append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
0,
)
.inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
// TODO: invalidade previous completions if this one fails
completions.push(c);
}
@@ -1176,7 +1183,7 @@ impl Pager {
.get()
};
let dirty_len = self.dirty_pages.borrow().iter().len();
let mut completions = Vec::with_capacity(dirty_len);
let mut completions: Vec<Completion> = Vec::with_capacity(dirty_len);
for (curr_page_idx, page_id) in
self.dirty_pages.borrow().iter().copied().enumerate()
{
@@ -1202,11 +1209,18 @@ impl Pager {
};
// TODO: invalidade previous completions on error here
let c = wal.borrow_mut().append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
db_size,
)?;
let c = wal
.borrow_mut()
.append_frame(
page.clone(),
self.page_size.get().expect("page size not set"),
db_size,
)
.inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
completions.push(c);
}
self.dirty_pages.borrow_mut().clear();

View File

@@ -1047,6 +1047,9 @@ pub fn write_pages_vectored(
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
for c in completions {
c.abort();
}
return Err(e);
}
}

View File

@@ -236,9 +236,13 @@ impl Sorter {
fn init_chunk_heap(&mut self) -> Result<IOResult<()>> {
match self.init_chunk_heap_state {
InitChunkHeapState::Start => {
let mut completions = Vec::with_capacity(self.chunks.len());
let mut completions: Vec<Completion> = Vec::with_capacity(self.chunks.len());
for chunk in self.chunks.iter_mut() {
let c = chunk.read()?;
let c = chunk.read().inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
completions.push(c);
}
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;