diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3d5f6cca9..50a273e7b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -50,7 +50,7 @@ jobs: - uses: actions/checkout@v3 - name: Clippy run: | - cargo clippy --workspace --all-features --all-targets -- --deny=warnings + cargo clippy --workspace --all-features --all-targets --exclude limbo-wasm -- -A unused-variables --deny=warnings simulator: runs-on: blacksmith-4vcpu-ubuntu-2404 diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index fa5ec517b..45ce3b958 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -693,19 +693,19 @@ impl DatabaseFile { } impl turso_core::DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: turso_core::Completion) -> turso_core::Result<()> { - let r = match c.completion_type { - turso_core::CompletionType::Read(ref r) => r, - _ => unreachable!(), - }; + fn read_page( + &self, + page_idx: usize, + c: turso_core::Completion, + ) -> turso_core::Result { + let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); if !(512..=65536).contains(&size) || size & (size - 1) != 0 { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c.into())?; - Ok(()) + self.file.pread(pos, c) } fn write_page( @@ -713,16 +713,14 @@ impl turso_core::DatabaseStorage for DatabaseFile { page_idx: usize, buffer: Arc>, c: turso_core::Completion, - ) -> turso_core::Result<()> { + ) -> turso_core::Result { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c.into())?; - Ok(()) + self.file.pwrite(pos, buffer, c) } - fn sync(&self, c: turso_core::Completion) -> turso_core::Result<()> { - let _ = self.file.sync(c.into())?; - Ok(()) + fn sync(&self, c: turso_core::Completion) -> turso_core::Result { + self.file.sync(c) } fn size(&self) -> turso_core::Result { diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index a4f758308..3abaed46e 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -266,7 +266,7 @@ impl Connection { .inner .lock() .map_err(|e| Error::MutexError(e.to_string()))?; - conn.cacheflush()?; + let res = conn.cacheflush()?; Ok(()) } diff --git a/core/io/generic.rs b/core/io/generic.rs index ebbd67d17..15ce52564 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -35,7 +35,7 @@ impl IO for GenericIO { })) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -86,14 +86,11 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match c.completion_type { - CompletionType::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let mut buf = r.buf_mut(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; @@ -106,8 +103,8 @@ impl File for GenericFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -117,7 +114,7 @@ impl File for GenericFile { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index aa0f21f3c..2ccb3e30a 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,6 +1,6 @@ #![allow(clippy::arc_with_non_send_sync)] -use super::{common, Completion, File, OpenFlags, IO}; +use super::{common, Completion, CompletionInner, File, OpenFlags, IO}; use crate::io::clock::{Clock, Instant}; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; @@ -168,7 +168,7 @@ impl IO for UringIO { Ok(uring_file) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -225,14 +225,15 @@ impl Clock for UringIO { #[inline(always)] /// use the callback pointer as the user_data for the operation as is /// common practice for io_uring to prevent more indirection -fn get_key(c: Arc) -> u64 { - Arc::into_raw(c) as u64 +fn get_key(c: Completion) -> u64 { + Arc::into_raw(c.inner) as u64 } #[inline(always)] -/// convert the user_data back to an Arc pointer -fn completion_from_key(key: u64) -> Arc { - unsafe { Arc::from_raw(key as *const Completion) } +/// convert the user_data back to an Completion pointer +fn completion_from_key(key: u64) -> Completion { + let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) }; + Completion { inner: c_inner } } pub struct UringFile { @@ -297,7 +298,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); @@ -320,8 +321,8 @@ impl File for UringFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let mut io = self.io.borrow_mut(); let write = { let buf = buffer.borrow(); @@ -337,7 +338,7 @@ impl File for UringFile { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let mut io = self.io.borrow_mut(); trace!("sync()"); let sync = with_fd!(self, |fd| { diff --git a/core/io/memory.rs b/core/io/memory.rs index 597696605..ac81b39bc 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -53,7 +53,7 @@ impl IO for MemoryIO { Ok(()) } - fn wait_for_completion(&self, _c: Arc) -> Result<()> { + fn wait_for_completion(&self, _c: Completion) -> Result<()> { todo!(); } @@ -83,7 +83,7 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -128,8 +128,8 @@ impl File for MemoryFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -165,7 +165,7 @@ impl File for MemoryFile { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { // no-op c.complete(0); Ok(c) diff --git a/core/io/mod.rs b/core/io/mod.rs index ee757d972..b58c17ab6 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,14 +14,10 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Arc) -> Result>; - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Arc, - ) -> Result>; - fn sync(&self, c: Arc) -> Result>; + fn pread(&self, pos: usize, c: Completion) -> Result; + fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) + -> Result; + fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; } @@ -47,7 +43,7 @@ pub trait IO: Clock + Send + Sync { fn run_once(&self) -> Result<()>; - fn wait_for_completion(&self, c: Arc) -> Result<()>; + fn wait_for_completion(&self, c: Completion) -> Result<()>; fn generate_random_number(&self) -> i64; @@ -58,7 +54,13 @@ pub type Complete = dyn Fn(Arc>, i32); pub type WriteComplete = dyn Fn(i32); pub type SyncComplete = dyn Fn(i32); +#[must_use] +#[derive(Clone)] pub struct Completion { + inner: Arc, +} + +struct CompletionInner { pub completion_type: CompletionType, is_completed: Cell, } @@ -77,8 +79,10 @@ pub struct ReadCompletion { impl Completion { pub fn new(completion_type: CompletionType) -> Self { Self { - completion_type, - is_completed: Cell::new(false), + inner: Arc::new(CompletionInner { + completion_type, + is_completed: Cell::new(false), + }), } } @@ -110,26 +114,35 @@ impl Completion { } pub fn is_completed(&self) -> bool { - self.is_completed.get() + self.inner.is_completed.get() } pub fn complete(&self, result: i32) { - match &self.completion_type { + match &self.inner.completion_type { CompletionType::Read(r) => r.complete(result), CompletionType::Write(w) => w.complete(result), CompletionType::Sync(s) => s.complete(result), // fix }; - self.is_completed.set(true); + self.inner.is_completed.set(true); } /// only call this method if you are sure that the completion is /// a ReadCompletion, panics otherwise pub fn as_read(&self) -> &ReadCompletion { - match self.completion_type { + match self.inner.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), } } + + /// only call this method if you are sure that the completion is + /// a WriteCompletion, panics otherwise + pub fn as_write(&self) -> &WriteCompletion { + match self.inner.completion_type { + CompletionType::Write(ref w) => w, + _ => unreachable!(), + } + } } pub struct WriteCompletion { diff --git a/core/io/unix.rs b/core/io/unix.rs index 1526e13e4..cfe6d37fe 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -286,7 +286,7 @@ impl IO for UnixIO { Ok(()) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -305,10 +305,10 @@ impl IO for UnixIO { } enum CompletionCallback { - Read(Arc>, Arc, usize), + Read(Arc>, Completion, usize), Write( Arc>, - Arc, + Completion, Arc>, usize, ), @@ -364,7 +364,7 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let file = self.file.lock().unwrap(); let result = { let r = c.as_read(); @@ -401,8 +401,8 @@ impl File for UnixFile<'_> { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let file = self.file.lock().unwrap(); let result = { let buf = buffer.borrow(); @@ -432,7 +432,7 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let file = self.file.lock().unwrap(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 4df856eed..545ce61a1 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -1,7 +1,6 @@ use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO}; use crate::ext::VfsMod; use crate::io::clock::{Clock, Instant}; -use crate::io::CompletionType; use crate::{LimboError, Result}; use std::cell::RefCell; use std::ffi::{c_void, CString}; @@ -44,7 +43,7 @@ impl IO for VfsMod { Ok(()) } - fn wait_for_completion(&self, _c: Arc) -> Result<()> { + fn wait_for_completion(&self, _c: Completion) -> Result<()> { todo!(); } @@ -98,11 +97,8 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { - let r = match c.completion_type { - CompletionType::Read(ref r) => r, - _ => unreachable!(), - }; + fn pread(&self, pos: usize, c: Completion) -> Result { + let r = c.as_read(); let result = { let mut buf = r.buf_mut(); let count = buf.len(); @@ -121,8 +117,8 @@ impl File for VfsFileImpl { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let buf = buffer.borrow(); let count = buf.as_slice().len(); if self.vfs.is_null() { @@ -146,7 +142,7 @@ impl File for VfsFileImpl { } } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { diff --git a/core/io/windows.rs b/core/io/windows.rs index 6ffdc005b..701e61269 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -33,7 +33,7 @@ impl IO for WindowsIO { } #[instrument(err, skip_all, level = Level::TRACE)] - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -84,7 +84,7 @@ impl File for WindowsFile { } #[instrument(skip(self, c), level = Level::TRACE)] - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let mut file = self.file.write(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let nr = { @@ -103,8 +103,8 @@ impl File for WindowsFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let mut file = self.file.write(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -115,7 +115,7 @@ impl File for WindowsFile { } #[instrument(err, skip_all, level = Level::TRACE)] - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let file = self.file.write(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 32f512324..bf3277f61 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2443,7 +2443,7 @@ impl BTreeCursor { } if !self.stack.has_parent() { - self.balance_root()?; + let res = self.balance_root()?; } let write_info = self.state.mut_write_info().unwrap(); @@ -5256,7 +5256,8 @@ impl BTreeCursor { let new_payload = &mut *new_payload; // if it all fits in local space and old_local_size is enough, do an in-place overwrite if new_payload.len() == *old_local_size { - self.overwrite_content(page_ref.clone(), *old_offset, new_payload)?; + let res = + self.overwrite_content(page_ref.clone(), *old_offset, new_payload)?; return Ok(IOResult::Done(())); } @@ -7751,7 +7752,7 @@ mod tests { tracing::info!("seed: {seed}"); for i in 0..inserts { pager.begin_read_tx().unwrap(); - pager.begin_write_tx().unwrap(); + let res = pager.begin_write_tx().unwrap(); let key = { let result; loop { @@ -7921,7 +7922,7 @@ mod tests { for i in 0..operations { let print_progress = i % 100 == 0; pager.begin_read_tx().unwrap(); - pager.begin_write_tx().unwrap(); + let res = pager.begin_write_tx().unwrap(); // Decide whether to insert or delete (80% chance of insert) let is_insert = rng.next_u64() % 100 < (insert_chance * 100.0) as u64; @@ -8302,7 +8303,7 @@ mod tests { let _ = run_until_done(|| pager.allocate_page1(), &pager); for _ in 0..(database_size - 1) { - pager.allocate_page().unwrap(); + let res = pager.allocate_page().unwrap(); } header_accessor::set_page_size(&pager, page_size).unwrap(); @@ -8334,7 +8335,7 @@ mod tests { ))); let c = Completion::new_write(|_| {}); #[allow(clippy::arc_with_non_send_sync)] - pager + let c = pager .db_file .write_page(current_page as usize, buf.clone(), c)?; pager.io.run_once()?; diff --git a/core/storage/database.rs b/core/storage/database.rs index 93841ee3e..8e539a5f3 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -1,5 +1,4 @@ use crate::error::LimboError; -use crate::io::CompletionType; use crate::{io::Completion, Buffer, Result}; use std::{cell::RefCell, sync::Arc}; use tracing::{instrument, Level}; @@ -10,14 +9,14 @@ use tracing::{instrument, Level}; /// the storage medium. A database can either be a file on disk, like in SQLite, /// or something like a remote page server service. pub trait DatabaseStorage: Send + Sync { - fn read_page(&self, page_idx: usize, c: Completion) -> Result<()>; + fn read_page(&self, page_idx: usize, c: Completion) -> Result; fn write_page( &self, page_idx: usize, buffer: Arc>, c: Completion, - ) -> Result<()>; - fn sync(&self, c: Completion) -> Result<()>; + ) -> Result; + fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; } @@ -34,7 +33,7 @@ unsafe impl Sync for DatabaseFile {} #[cfg(feature = "fs")] impl DatabaseStorage for DatabaseFile { #[instrument(skip_all, level = Level::DEBUG)] - fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { + fn read_page(&self, page_idx: usize, c: Completion) -> Result { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -42,8 +41,7 @@ impl DatabaseStorage for DatabaseFile { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c.into())?; - Ok(()) + self.file.pread(pos, c) } #[instrument(skip_all, level = Level::DEBUG)] @@ -52,21 +50,19 @@ impl DatabaseStorage for DatabaseFile { page_idx: usize, buffer: Arc>, c: Completion, - ) -> Result<()> { + ) -> Result { let buffer_size = buffer.borrow().len(); assert!(page_idx > 0); assert!(buffer_size >= 512); assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c.into())?; - Ok(()) + self.file.pwrite(pos, buffer, c) } #[instrument(skip_all, level = Level::DEBUG)] - fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c.into())?; - Ok(()) + fn sync(&self, c: Completion) -> Result { + self.file.sync(c) } #[instrument(skip_all, level = Level::DEBUG)] @@ -91,19 +87,15 @@ unsafe impl Sync for FileMemoryStorage {} impl DatabaseStorage for FileMemoryStorage { #[instrument(skip_all, level = Level::DEBUG)] - fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { - let r = match c.completion_type { - CompletionType::Read(ref r) => r, - _ => unreachable!(), - }; + fn read_page(&self, page_idx: usize, c: Completion) -> Result { + let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); if !(512..=65536).contains(&size) || size & (size - 1) != 0 { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c.into())?; - Ok(()) + self.file.pread(pos, c) } #[instrument(skip_all, level = Level::DEBUG)] @@ -112,20 +104,18 @@ impl DatabaseStorage for FileMemoryStorage { page_idx: usize, buffer: Arc>, c: Completion, - ) -> Result<()> { + ) -> Result { let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c.into())?; - Ok(()) + self.file.pwrite(pos, buffer, c) } #[instrument(skip_all, level = Level::DEBUG)] - fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c.into())?; - Ok(()) + fn sync(&self, c: Completion) -> Result { + self.file.sync(c) } #[instrument(skip_all, level = Level::DEBUG)] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 49b1e344e..f03b69252 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -881,7 +881,7 @@ impl Pager { return Ok(page); } - sqlite3_ondisk::begin_read_page( + let c = sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), page.clone(), @@ -1167,7 +1167,7 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result> { + pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result { let wal = self.wal.borrow(); wal.read_frame_raw(frame_no.into(), frame) } @@ -1464,7 +1464,7 @@ impl Pager { (default_header.get_page_size() - default_header.reserved_space as u32) as u16, ); let write_counter = Rc::new(RefCell::new(0)); - begin_write_btree_page(self, &page1.get(), write_counter.clone())?; + let c = begin_write_btree_page(self, &page1.get(), write_counter.clone())?; self.allocate_page1_state .replace(AllocatePage1State::Writing { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index fbea32e24..83f669a0d 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -762,7 +762,7 @@ pub fn begin_read_page( buffer_pool: Arc, page: PageRef, page_idx: usize, -) -> Result<()> { +) -> Result { tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx); let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { @@ -783,8 +783,7 @@ pub fn begin_read_page( } }); let c = Completion::new_read(buf, complete); - db_file.read_page(page_idx, c)?; - Ok(()) + db_file.read_page(page_idx, c) } #[instrument(skip_all, level = Level::INFO)] @@ -814,7 +813,7 @@ pub fn begin_write_btree_page( pager: &Pager, page: &PageRef, write_counter: Rc>, -) -> Result<()> { +) -> Result { tracing::trace!("begin_write_btree_page(page={})", page.get().id); let page_source = &pager.db_file; let page_finish = page.clone(); @@ -861,7 +860,7 @@ pub fn begin_sync(db_file: Arc, syncing: Rc>) *syncing.borrow_mut() = false; }); #[allow(clippy::arc_with_non_send_sync)] - db_file.sync(completion)?; + let c = db_file.sync(completion)?; Ok(()) } @@ -1566,7 +1565,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result>, i32)>, -) -> Result> { +) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); let drop_fn = Rc::new(|_buf| {}); let buf = Arc::new(RefCell::new(Buffer::allocate( @@ -1585,7 +1584,7 @@ pub fn begin_read_wal_frame_raw( ))); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); - let c = io.pread(offset, c.into())?; + let c = io.pread(offset, c)?; Ok(c) } @@ -1594,7 +1593,7 @@ pub fn begin_read_wal_frame( offset: usize, buffer_pool: Arc, complete: Box>, i32)>, -) -> Result> { +) -> Result { tracing::trace!("begin_read_wal_frame(offset={})", offset); let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { @@ -1604,7 +1603,7 @@ pub fn begin_read_wal_frame( let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn))); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); - let c = io.pread(offset, c.into())?; + let c = io.pread(offset, c)?; Ok(c) } @@ -1694,7 +1693,7 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< }; #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_write(write_complete); - io.pwrite(0, buffer.clone(), c.into())?; + let c = io.pwrite(0, buffer.clone(), c)?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c02af788d..af470eb1b 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -209,7 +209,7 @@ pub trait Wal { fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc) -> Result<()>; /// Read a raw frame (header included) from the WAL. - fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result>; + fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result; /// Write a raw frame (header included) from the WAL. /// Note, that turso-db will use page_no and size_after fields from the header, but will overwrite checksum with proper value @@ -284,7 +284,7 @@ impl Wal for DummyWAL { Ok(()) } - fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result> { + fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result { todo!(); } @@ -626,7 +626,7 @@ impl Wal for WalFile { let frame = frame.clone(); finish_read_page(page.get().id, buf, frame).unwrap(); }); - begin_read_wal_frame( + let c = begin_read_wal_frame( &self.get_shared().file, offset + WAL_FRAME_HEADER_SIZE, buffer_pool, @@ -636,7 +636,7 @@ impl Wal for WalFile { } #[instrument(skip_all, level = Level::DEBUG)] - fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result> { + fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result { tracing::debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len()); @@ -731,7 +731,7 @@ impl Wal for WalFile { db_size as u32, page, ); - let c = Arc::new(Completion::new_write(|_| {})); + let c = Completion::new_write(|_| {}); let c = shared.file.pwrite(offset, frame_bytes, c)?; self.io.wait_for_completion(c)?; self.complete_append_frame(page_id, frame_id, checksums); @@ -784,7 +784,7 @@ impl Wal for WalFile { *write_counter.borrow_mut() -= 1; } }); - let result = shared.file.pwrite(offset, frame_bytes.clone(), c.into()); + let result = shared.file.pwrite(offset, frame_bytes.clone(), c); if let Err(err) = result { *write_counter.borrow_mut() -= 1; return Err(err); @@ -914,7 +914,7 @@ impl Wal for WalFile { } CheckpointState::WritePage => { self.ongoing_checkpoint.page.set_dirty(); - begin_write_btree_page( + let c = begin_write_btree_page( pager, &self.ongoing_checkpoint.page, write_counter.clone(), @@ -1001,7 +1001,7 @@ impl Wal for WalFile { syncing.set(false); }); let shared = self.get_shared(); - shared.file.sync(completion.into())?; + let c = shared.file.sync(completion)?; self.sync_state.set(SyncState::Syncing); Ok(IOResult::IO) } diff --git a/core/types.rs b/core/types.rs index 96537bdfa..113e0ce09 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2323,6 +2323,7 @@ impl Cursor { } #[derive(Debug)] +#[must_use] pub enum IOResult { Done(T), IO, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index b5ad3e599..a9f21431e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -6357,7 +6357,7 @@ pub fn op_open_ephemeral( } else { BTreeCursor::new_table(mv_cursor, pager.clone(), root_page as usize, num_columns) }; - cursor.rewind()?; // Will never return io + let res = cursor.rewind()?; // Will never return io let mut cursors: std::cell::RefMut<'_, Vec>> = state.cursors.borrow_mut(); diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 021256d2a..72edca61a 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -402,7 +402,7 @@ impl SortedChunk { read_buffer_ref, read_complete, ))); - self.file.pread(self.total_bytes_read.get(), Arc::new(c))?; + let c = self.file.pread(self.total_bytes_read.get(), c)?; Ok(()) } @@ -448,7 +448,7 @@ impl SortedChunk { }); let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - self.file.pwrite(0, buffer_ref, Arc::new(c))?; + let c = self.file.pwrite(0, buffer_ref, c)?; Ok(()) } } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index d1682c353..1be2cb48b 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -38,12 +38,12 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, - pub sync_completion: RefCell>>, + pub sync_completion: RefCell>, pub queued_io: RefCell>, pub clock: Arc, } -type IoOperation = Box Result>>; +type IoOperation = Box Result>; pub struct DelayedIo { pub time: turso_core::Instant, @@ -121,7 +121,7 @@ impl SimulatorFile { if queued_io[i].time <= now { let io = queued_io.remove(i); // your code here - (io.op)(self)?; + let c = (io.op)(self)?; } else { i += 1; } @@ -149,11 +149,7 @@ impl File for SimulatorFile { self.inner.unlock_file() } - fn pread( - &self, - pos: usize, - c: Arc, - ) -> Result> { + fn pread(&self, pos: usize, c: turso_core::Completion) -> Result { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { tracing::debug!("pread fault"); @@ -178,8 +174,8 @@ impl File for SimulatorFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: turso_core::Completion, + ) -> Result { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { tracing::debug!("pwrite fault"); @@ -200,7 +196,7 @@ impl File for SimulatorFile { } } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: turso_core::Completion) -> Result { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); if self.fault.get() { // TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed. diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 7c888cc7f..29973f889 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -104,7 +104,7 @@ impl IO for SimulatorIO { Ok(file) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 4dc19f1c8..a50100b42 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -433,7 +433,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { let drop_fn = Rc::new(move |_| {}); #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(RefCell::new(Buffer::new(Pin::new(data.to_vec()), drop_fn))); - let result = file.pwrite(offset, buffer, completion.into()).unwrap(); + let result = file.pwrite(offset, buffer, completion).unwrap(); while !result.is_completed() { io.run_once().unwrap(); }