diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 7d84d6cc3..4b029fdc9 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -444,8 +444,8 @@ impl DatabaseFile { } impl limbo_core::DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> limbo_core::Result<()> { - let r = match c { + fn read_page(&self, page_idx: usize, c: Arc) -> limbo_core::Result<()> { + let r = match *c { limbo_core::Completion::Read(ref r) => r, _ => unreachable!(), }; @@ -463,7 +463,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, buffer: Arc>, - c: limbo_core::Completion, + c: Arc, ) -> limbo_core::Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; @@ -471,7 +471,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile { Ok(()) } - fn sync(&self, c: limbo_core::Completion) -> limbo_core::Result<()> { + fn sync(&self, c: Arc) -> limbo_core::Result<()> { self.file.sync(c) } } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index ffbe90928..4277f1f3f 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -208,8 +208,8 @@ impl limbo_core::File for File { Ok(()) } - fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> { - let r = match &c { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { + let r = match *c { limbo_core::Completion::Read(ref r) => r, _ => unreachable!(), }; @@ -227,9 +227,9 @@ impl limbo_core::File for File { &self, pos: usize, buffer: Arc>, - c: limbo_core::Completion, + c: Arc, ) -> Result<()> { - let w = match &c { + let w = match *c { limbo_core::Completion::Write(ref w) => w, _ => unreachable!(), }; @@ -240,7 +240,7 @@ impl limbo_core::File for File { Ok(()) } - fn sync(&self, c: limbo_core::Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { self.vfs.sync(self.fd); c.complete(0); Ok(()) @@ -326,8 +326,8 @@ impl DatabaseFile { } impl limbo_core::DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> Result<()> { - let r = match c { + fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { + let r = match *c { limbo_core::Completion::Read(ref r) => r, _ => unreachable!(), }; @@ -345,7 +345,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, buffer: Arc>, - c: limbo_core::Completion, + c: Arc, ) -> Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; @@ -353,7 +353,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile { Ok(()) } - fn sync(&self, _c: limbo_core::Completion) -> Result<()> { + fn sync(&self, _c: Arc) -> Result<()> { todo!() } } diff --git a/core/io/generic.rs b/core/io/generic.rs index aab5f2687..60e11f119 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -79,11 +79,11 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result<()> { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match c { + let r = match *c { Completion::Read(ref r) => r, _ => unreachable!(), }; @@ -95,7 +95,7 @@ impl File for GenericFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -105,7 +105,7 @@ impl File for GenericFile { Ok(()) } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 25d6aa33e..44d4c5696 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,3 +1,5 @@ +#![allow(clippy::arc_with_non_send_sync)] + use super::{common, Completion, File, OpenFlags, WriteCompletion, IO}; use crate::io::clock::{Clock, Instant}; use crate::{LimboError, MemoryIO, Result}; @@ -93,7 +95,7 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Completion) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc) { trace!("submit_entry({:?})", entry); self.pending[entry.get_user_data() as usize] = Some(c); unsafe { @@ -263,7 +265,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result<()> { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); @@ -282,7 +284,7 @@ impl File for UringFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { diff --git a/core/io/memory.rs b/core/io/memory.rs index 9cc56a5e3..cb20c6e63 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -79,7 +79,7 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result<()> { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -120,7 +120,7 @@ impl File for MemoryFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -156,7 +156,7 @@ impl File for MemoryFile { Ok(()) } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { // no-op c.complete(0); Ok(()) diff --git a/core/io/mod.rs b/core/io/mod.rs index 6f75e9bea..4cbe785c5 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,9 +14,9 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Completion) -> Result<()>; - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()>; - fn sync(&self, c: Completion) -> Result<()>; + fn pread(&self, pos: usize, c: Arc) -> Result<()>; + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()>; + fn sync(&self, c: Arc) -> Result<()>; fn size(&self) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index 721ba20f3..8c2f121e3 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -268,10 +268,10 @@ impl IO for UnixIO { } enum CompletionCallback { - Read(Arc>, Completion, usize), + Read(Arc>, Arc, usize), Write( Arc>, - Completion, + Arc, Arc>, usize, ), @@ -326,7 +326,7 @@ impl File for UnixFile<'_> { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result<()> { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { let file = self.file.borrow(); let result = { let r = c.as_read(); @@ -358,7 +358,7 @@ impl File for UnixFile<'_> { } } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { let file = self.file.borrow(); let result = { let buf = buffer.borrow(); @@ -387,7 +387,7 @@ impl File for UnixFile<'_> { } } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { let file = self.file.borrow(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index d02f7d345..7e953ebd0 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -93,8 +93,8 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result<()> { - let r = match &c { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { + let r = match &*c { Completion::Read(ref r) => r, _ => unreachable!(), }; @@ -112,7 +112,7 @@ impl File for VfsFileImpl { } } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { let buf = buffer.borrow(); let count = buf.as_slice().len(); if self.vfs.is_null() { @@ -136,7 +136,7 @@ impl File for VfsFileImpl { } } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { diff --git a/core/lib.rs b/core/lib.rs index 038fc6218..e60d74502 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1,3 +1,5 @@ +#![allow(clippy::arc_with_non_send_sync)] + mod error; mod ext; mod fast_lock; @@ -302,7 +304,8 @@ pub fn maybe_init_database_file(file: &Arc, io: &Arc) -> Resul let completion = Completion::Write(WriteCompletion::new(Box::new(move |_| { *flag_complete.borrow_mut() = true; }))); - file.pwrite(0, contents.buffer.clone(), completion)?; + #[allow(clippy::arc_with_non_send_sync)] + file.pwrite(0, contents.buffer.clone(), Arc::new(completion))?; } let mut limit = 100; loop { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 57e88802a..735f0495e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6776,7 +6776,7 @@ mod tests { let write_complete = Box::new(|_| {}); let c = Completion::Write(WriteCompletion::new(write_complete)); - db_file.write_page(1, buf.clone(), c).unwrap(); + db_file.write_page(1, buf.clone(), Arc::new(c)).unwrap(); let wal_shared = WalFileShared::open_shared(&io, "test.wal", page_size).unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( @@ -6826,9 +6826,10 @@ mod tests { ))); let write_complete = Box::new(|_| {}); let c = Completion::Write(WriteCompletion::new(write_complete)); + #[allow(clippy::arc_with_non_send_sync)] pager .db_file - .write_page(current_page as usize, buf.clone(), c)?; + .write_page(current_page as usize, buf.clone(), Arc::new(c))?; pager.io.run_once()?; let page = cursor.read_page(current_page as usize)?; diff --git a/core/storage/database.rs b/core/storage/database.rs index cf8b57d8e..db57892ae 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -8,14 +8,14 @@ use std::{cell::RefCell, sync::Arc}; /// the storage medium. A database can either be a file on disk, like in SQLite, /// or something like a remote page server service. pub trait DatabaseStorage: Send + Sync { - fn read_page(&self, page_idx: usize, c: Completion) -> Result<()>; + fn read_page(&self, page_idx: usize, c: Arc) -> Result<()>; fn write_page( &self, page_idx: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result<()>; - fn sync(&self, c: Completion) -> Result<()>; + fn sync(&self, c: Arc) -> Result<()>; } #[cfg(feature = "fs")] @@ -30,7 +30,7 @@ unsafe impl Sync for DatabaseFile {} #[cfg(feature = "fs")] impl DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { + fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -46,7 +46,7 @@ impl DatabaseStorage for DatabaseFile { &self, page_idx: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result<()> { let buffer_size = buffer.borrow().len(); assert!(page_idx > 0); @@ -58,7 +58,7 @@ impl DatabaseStorage for DatabaseFile { Ok(()) } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { self.file.sync(c) } } @@ -78,8 +78,8 @@ unsafe impl Send for FileMemoryStorage {} unsafe impl Sync for FileMemoryStorage {} impl DatabaseStorage for FileMemoryStorage { - fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { - let r = match c { + fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { + let r = match *c { Completion::Read(ref r) => r, _ => unreachable!(), }; @@ -97,7 +97,7 @@ impl DatabaseStorage for FileMemoryStorage { &self, page_idx: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result<()> { let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); @@ -108,7 +108,7 @@ impl DatabaseStorage for FileMemoryStorage { Ok(()) } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { self.file.sync(c) } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 30035d1ab..dcbed64d0 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -41,6 +41,8 @@ //! //! https://www.sqlite.org/fileformat.html +#![allow(clippy::arc_with_non_send_sync)] + use crate::error::LimboError; use crate::fast_lock::SpinLock; use crate::io::{Buffer, Complete, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; @@ -293,7 +295,8 @@ pub fn begin_read_database_header( finish_read_database_header(buf, header).unwrap(); }); let c = Completion::Read(ReadCompletion::new(buf, complete)); - db_file.read_page(1, c)?; + #[allow(clippy::arc_with_non_send_sync)] + db_file.read_page(1, Arc::new(c))?; Ok(result) } @@ -355,7 +358,8 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re #[allow(clippy::arc_with_non_send_sync)] let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn))); let c = Completion::Read(ReadCompletion::new(buf, read_complete)); - page_source.read_page(1, c)?; + #[allow(clippy::arc_with_non_send_sync)] + page_source.read_page(1, Arc::new(c))?; // run get header block pager.io.run_once()?; @@ -369,7 +373,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re }); let c = Completion::Write(WriteCompletion::new(write_complete)); - page_source.write_page(1, buffer_to_copy, c)?; + page_source.write_page(1, buffer_to_copy, Arc::new(c))?; Ok(()) } @@ -819,7 +823,7 @@ pub fn begin_read_page( } }); let c = Completion::Read(ReadCompletion::new(buf, complete)); - db_file.read_page(page_idx, c)?; + db_file.read_page(page_idx, Arc::new(c))?; Ok(()) } @@ -877,7 +881,7 @@ pub fn begin_write_btree_page( }) }; let c = Completion::Write(WriteCompletion::new(write_complete)); - page_source.write_page(page_id, buffer.clone(), c)?; + page_source.write_page(page_id, buffer.clone(), Arc::new(c))?; Ok(()) } @@ -889,7 +893,8 @@ pub fn begin_sync(db_file: Arc, syncing: Rc>) *syncing.borrow_mut() = false; }), }); - db_file.sync(completion)?; + #[allow(clippy::arc_with_non_send_sync)] + db_file.sync(Arc::new(completion))?; Ok(()) } @@ -1519,7 +1524,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, header: &WalHeader) -> Result< } }) }; - let c = Completion::Write(WriteCompletion::new(write_complete)); + #[allow(clippy::arc_with_non_send_sync)] + let c = Arc::new(Completion::Write(WriteCompletion::new(write_complete))); io.pwrite(0, buffer.clone(), c)?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index edb657f39..4eab23e44 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,3 +1,5 @@ +#![allow(clippy::arc_with_non_send_sync)] + use std::cell::UnsafeCell; use std::collections::HashMap; use tracing::{debug, trace}; @@ -750,7 +752,7 @@ impl Wal for WalFile { *syncing.borrow_mut() = false; }), }); - shared.file.sync(completion)?; + shared.file.sync(Arc::new(completion))?; } self.sync_state.replace(SyncState::Syncing); Ok(WalFsyncStatus::IO) diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 3e66a02fa..5d1c2cd4b 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -77,7 +77,7 @@ impl File for SimulatorFile { self.inner.unlock_file() } - fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result<()> { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { *self.nr_pread_calls.borrow_mut() += 1; if *self.fault.borrow() { *self.nr_pread_faults.borrow_mut() += 1; @@ -92,7 +92,7 @@ impl File for SimulatorFile { &self, pos: usize, buffer: Arc>, - c: limbo_core::Completion, + c: Arc, ) -> Result<()> { *self.nr_pwrite_calls.borrow_mut() += 1; if *self.fault.borrow() { @@ -104,7 +104,7 @@ impl File for SimulatorFile { self.inner.pwrite(pos, buffer, c) } - fn sync(&self, c: limbo_core::Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { *self.nr_sync_calls.borrow_mut() += 1; self.inner.sync(c) }