Change callsites to cancel any further IO when an error occurs and drain

This commit is contained in:
PThorpe92
2025-08-25 16:23:00 -04:00
parent a750505762
commit ccae3ab0f2
5 changed files with 88 additions and 54 deletions

View File

@@ -490,7 +490,7 @@ impl WrappedIOUring {
completion_from_key(user_data).error(CompletionError::ShortWrite);
return;
}
state.advance(written);
state.advance(written as u64);
match state.remaining() {
0 => {
@@ -569,22 +569,51 @@ impl IO for UringIO {
Ok(())
}
/// Drain calls `run_once` in a loop until the ring is empty.
/// To prevent mutex churn of checking if ring.empty() on each iteration, we violate DRY
fn drain(&self) -> Result<()> {
trace!("drain()");
let mut inner = self.inner.lock();
let ring = &mut inner.ring;
loop {
{
let inner = self.inner.borrow();
if inner.ring.empty() {
break;
ring.flush_overflow()?;
if ring.empty() {
return Ok(());
}
ring.submit_and_wait()?;
'inner: loop {
let Some(cqe) = ring.ring.completion().next() else {
break 'inner;
};
ring.pending_ops -= 1;
let user_data = cqe.user_data();
if user_data == CANCEL_TAG {
// ignore if this is a cancellation CQE
continue 'inner;
}
let result = cqe.result();
turso_assert!(
user_data != 0,
"user_data must not be zero, we dont submit linked timeouts that would cause this"
);
if let Some(state) = ring.writev_states.remove(&user_data) {
// if we have ongoing writev state, handle it separately and don't call completion
ring.handle_writev_completion(state, user_data, result);
continue 'inner;
}
if result < 0 {
let errno = -result;
let err = std::io::Error::from_raw_os_error(errno);
completion_from_key(user_data).error(err.into());
} else {
completion_from_key(user_data).complete(result)
}
}
self.run_once()?;
}
Ok(())
}
fn cancel(&self, completions: &[Completion]) -> Result<()> {
let mut inner = self.inner.borrow_mut();
let mut inner = self.inner.lock();
for c in completions {
c.abort();
let e = io_uring::opcode::AsyncCancel::new(get_key(c.clone()))

View File

@@ -2633,20 +2633,22 @@ impl BTreeCursor {
let current_sibling = sibling_pointer;
let mut completions: Vec<Completion> = Vec::with_capacity(current_sibling + 1);
for i in (0..=current_sibling).rev() {
let (page, c) =
btree_read_page(&self.pager, pgno as usize).inspect_err(|_| {
for c in completions.iter() {
c.abort();
match btree_read_page(&self.pager, pgno as usize) {
Err(e) => {
tracing::error!("error reading page {}: {}", pgno, e);
self.pager.io.cancel(&completions)?;
self.pager.io.drain()?;
return Err(e);
}
Ok((page, c)) => {
// mark as dirty
self.pager.add_dirty(&page);
pages_to_balance[i].replace(page);
if let Some(c) = c {
completions.push(c);
}
})?;
{
// mark as dirty
self.pager.add_dirty(&page);
}
}
if let Some(c) = c {
completions.push(c);
}
pages_to_balance[i].replace(page);
if i == 0 {
break;
}

View File

@@ -1275,34 +1275,36 @@ impl Pager {
};
pages.push(page);
if pages.len() == IOV_MAX {
let c = wal
.borrow_mut()
.append_frames_vectored(
std::mem::replace(
&mut pages,
Vec::with_capacity(std::cmp::min(IOV_MAX, dirty_pages.len() - idx)),
),
page_sz,
commit_frame,
)
.inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
completions.push(c);
match wal.borrow_mut().append_frames_vectored(
std::mem::replace(
&mut pages,
Vec::with_capacity(std::cmp::min(IOV_MAX, dirty_pages.len() - idx)),
),
page_sz,
commit_frame,
) {
Err(e) => {
self.io.cancel(&completions)?;
self.io.drain()?;
return Err(e);
}
Ok(c) => completions.push(c),
}
}
}
if !pages.is_empty() {
let c = wal
match wal
.borrow_mut()
.append_frames_vectored(pages, page_sz, commit_frame)
.inspect_err(|_| {
for c in completions.iter() {
c.abort();
}
})?;
completions.push(c);
{
Ok(c) => completions.push(c),
Err(e) => {
tracing::error!("cacheflush: error appending frames: {e}");
self.io.cancel(&completions)?;
self.io.drain()?;
return Err(e);
}
}
}
Ok(completions)
}
@@ -1379,9 +1381,7 @@ impl Pager {
match r {
Ok(c) => completions.push(c),
Err(e) => {
for c in &completions {
c.abort();
}
self.io.cancel(&completions)?;
return Err(e);
}
}

View File

@@ -1086,9 +1086,9 @@ pub fn write_pages_vectored(
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
for c in completions {
c.abort();
}
pager.io.cancel(&completions)?;
// cancel any submitted completions and drain the IO before returning an error
pager.io.drain()?;
return Err(e);
}
}

View File

@@ -238,12 +238,15 @@ impl Sorter {
InitChunkHeapState::Start => {
let mut completions: Vec<Completion> = Vec::with_capacity(self.chunks.len());
for chunk in self.chunks.iter_mut() {
let c = chunk.read().inspect_err(|_| {
for c in completions.iter() {
c.abort();
match chunk.read() {
Err(e) => {
tracing::error!("Failed to read chunk: {e}");
self.io.cancel(&completions)?;
self.io.drain()?;
return Err(e);
}
})?;
completions.push(c);
Ok(c) => completions.push(c),
};
}
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
io_yield_many!(completions);