diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 69bee24e3..e56221b65 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -490,7 +490,7 @@ impl WrappedIOUring { completion_from_key(user_data).error(CompletionError::ShortWrite); return; } - state.advance(written); + state.advance(written as u64); match state.remaining() { 0 => { @@ -569,22 +569,51 @@ impl IO for UringIO { Ok(()) } + /// Drain calls `run_once` in a loop until the ring is empty. + /// To prevent mutex churn of checking if ring.empty() on each iteration, we violate DRY fn drain(&self) -> Result<()> { trace!("drain()"); + let mut inner = self.inner.lock(); + let ring = &mut inner.ring; loop { - { - let inner = self.inner.borrow(); - if inner.ring.empty() { - break; + ring.flush_overflow()?; + if ring.empty() { + return Ok(()); + } + ring.submit_and_wait()?; + 'inner: loop { + let Some(cqe) = ring.ring.completion().next() else { + break 'inner; + }; + ring.pending_ops -= 1; + let user_data = cqe.user_data(); + if user_data == CANCEL_TAG { + // ignore if this is a cancellation CQE + continue 'inner; + } + let result = cqe.result(); + turso_assert!( + user_data != 0, + "user_data must not be zero, we dont submit linked timeouts that would cause this" + ); + if let Some(state) = ring.writev_states.remove(&user_data) { + // if we have ongoing writev state, handle it separately and don't call completion + ring.handle_writev_completion(state, user_data, result); + continue 'inner; + } + if result < 0 { + let errno = -result; + let err = std::io::Error::from_raw_os_error(errno); + completion_from_key(user_data).error(err.into()); + } else { + completion_from_key(user_data).complete(result) } } - self.run_once()?; } - Ok(()) } fn cancel(&self, completions: &[Completion]) -> Result<()> { - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock(); for c in completions { c.abort(); let e = io_uring::opcode::AsyncCancel::new(get_key(c.clone())) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 9f90f9472..a89341974 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2633,20 +2633,22 @@ impl BTreeCursor { let current_sibling = sibling_pointer; let mut completions: Vec = Vec::with_capacity(current_sibling + 1); for i in (0..=current_sibling).rev() { - let (page, c) = - btree_read_page(&self.pager, pgno as usize).inspect_err(|_| { - for c in completions.iter() { - c.abort(); + match btree_read_page(&self.pager, pgno as usize) { + Err(e) => { + tracing::error!("error reading page {}: {}", pgno, e); + self.pager.io.cancel(&completions)?; + self.pager.io.drain()?; + return Err(e); + } + Ok((page, c)) => { + // mark as dirty + self.pager.add_dirty(&page); + pages_to_balance[i].replace(page); + if let Some(c) = c { + completions.push(c); } - })?; - { - // mark as dirty - self.pager.add_dirty(&page); + } } - if let Some(c) = c { - completions.push(c); - } - pages_to_balance[i].replace(page); if i == 0 { break; } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 31eb980cd..c77aeb677 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1275,34 +1275,36 @@ impl Pager { }; pages.push(page); if pages.len() == IOV_MAX { - let c = wal - .borrow_mut() - .append_frames_vectored( - std::mem::replace( - &mut pages, - Vec::with_capacity(std::cmp::min(IOV_MAX, dirty_pages.len() - idx)), - ), - page_sz, - commit_frame, - ) - .inspect_err(|_| { - for c in completions.iter() { - c.abort(); - } - })?; - completions.push(c); + match wal.borrow_mut().append_frames_vectored( + std::mem::replace( + &mut pages, + Vec::with_capacity(std::cmp::min(IOV_MAX, dirty_pages.len() - idx)), + ), + page_sz, + commit_frame, + ) { + Err(e) => { + self.io.cancel(&completions)?; + self.io.drain()?; + return Err(e); + } + Ok(c) => completions.push(c), + } } } if !pages.is_empty() { - let c = wal + match wal .borrow_mut() .append_frames_vectored(pages, page_sz, commit_frame) - .inspect_err(|_| { - for c in completions.iter() { - c.abort(); - } - })?; - completions.push(c); + { + Ok(c) => completions.push(c), + Err(e) => { + tracing::error!("cacheflush: error appending frames: {e}"); + self.io.cancel(&completions)?; + self.io.drain()?; + return Err(e); + } + } } Ok(completions) } @@ -1379,9 +1381,7 @@ impl Pager { match r { Ok(c) => completions.push(c), Err(e) => { - for c in &completions { - c.abort(); - } + self.io.cancel(&completions)?; return Err(e); } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 696d10d05..54854b516 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1086,9 +1086,9 @@ pub fn write_pages_vectored( if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { done.store(true, Ordering::Release); } - for c in completions { - c.abort(); - } + pager.io.cancel(&completions)?; + // cancel any submitted completions and drain the IO before returning an error + pager.io.drain()?; return Err(e); } } diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index d8a325dae..c44c36b16 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -238,12 +238,15 @@ impl Sorter { InitChunkHeapState::Start => { let mut completions: Vec = Vec::with_capacity(self.chunks.len()); for chunk in self.chunks.iter_mut() { - let c = chunk.read().inspect_err(|_| { - for c in completions.iter() { - c.abort(); + match chunk.read() { + Err(e) => { + tracing::error!("Failed to read chunk: {e}"); + self.io.cancel(&completions)?; + self.io.drain()?; + return Err(e); } - })?; - completions.push(c); + Ok(c) => completions.push(c), + }; } self.init_chunk_heap_state = InitChunkHeapState::PushChunk; io_yield_many!(completions);