diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 99712bd62..0433d8d09 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -583,7 +583,7 @@ impl DatabaseFile { } impl limbo_core::DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: Arc) -> limbo_core::Result<()> { + fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> limbo_core::Result<()> { let r = match c.completion_type { limbo_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -602,7 +602,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, buffer: Arc>, - c: Arc, + c: limbo_core::Completion, ) -> limbo_core::Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; @@ -610,8 +610,9 @@ impl limbo_core::DatabaseStorage for DatabaseFile { Ok(()) } - fn sync(&self, c: Arc) -> limbo_core::Result<()> { - self.file.sync(c) + fn sync(&self, c: limbo_core::Completion) -> limbo_core::Result<()> { + let _ = self.file.sync(c)?; + Ok(()) } fn size(&self) -> limbo_core::Result { diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index e1c18edc2..3a19b3949 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -211,7 +211,7 @@ impl limbo_core::File for File { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: limbo_core::Completion) -> Result> { let r = match c.completion_type { limbo_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -223,15 +223,15 @@ impl limbo_core::File for File { assert!(nr >= 0); } r.complete(); - Ok(()) + Ok(Arc::new(c)) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result<()> { + c: limbo_core::Completion, + ) -> Result> { let w = match c.completion_type { limbo_core::CompletionType::Write(ref w) => w, _ => unreachable!(), @@ -240,13 +240,13 @@ impl limbo_core::File for File { let buf: &[u8] = buf.as_slice(); self.vfs.pwrite(self.fd, buf, pos); w.complete(buf.len() as i32); - Ok(()) + Ok(Arc::new(c)) } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: limbo_core::Completion) -> Result> { self.vfs.sync(self.fd); c.complete(0); - Ok(()) + Ok(Arc::new(c)) } fn size(&self) -> Result { @@ -336,7 +336,7 @@ impl DatabaseFile { } impl limbo_core::DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { + fn read_page(&self, page_idx: usize, c: limbo_core::Completion) -> Result<()> { let r = match c.completion_type { limbo_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -355,7 +355,7 @@ impl limbo_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, buffer: Arc>, - c: Arc, + c: limbo_core::Completion, ) -> Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; @@ -363,8 +363,9 @@ impl limbo_core::DatabaseStorage for DatabaseFile { Ok(()) } - fn sync(&self, c: Arc) -> Result<()> { - self.file.sync(c) + fn sync(&self, c: limbo_core::Completion) -> Result> { + let _ = self.file.sync(c)?; + Ok(()) } fn size(&self) -> Result { diff --git a/core/io/generic.rs b/core/io/generic.rs index aaaf2e305..6e1faee73 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -86,7 +86,7 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { @@ -99,29 +99,29 @@ impl File for GenericFile { file.read_exact(buf)?; } c.complete(0); - Ok(()) + Ok(Arc::new(c)) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result<()> { + c: Completion, + ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buf.len() as i32); - Ok(()) + Ok(Arc::new(c)) } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: Completion) -> Result> { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); - Ok(()) + Ok(Arc::new(c)) } fn size(&self) -> Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 307a7a2f2..f0b9bb910 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -273,7 +273,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result> { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); @@ -288,16 +288,17 @@ impl File for UringFile { .build() .user_data(io.ring.get_key()) }; - io.ring.submit_entry(&read_e, c); - Ok(()) + let c = Arc::new(c); + io.ring.submit_entry(&read_e, c.clone()); + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result<()> { + c: Completion, + ) -> Result> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { @@ -309,6 +310,7 @@ impl File for UringFile { .build() .user_data(io.ring.get_key()) }; + let c = Arc::new(c); io.ring.submit_entry( &write, Arc::new(Completion::new(CompletionType::Write( @@ -319,18 +321,19 @@ impl File for UringFile { })), ))), ); - Ok(()) + Ok(c) } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: Completion) -> Result> { let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); trace!("sync()"); let sync = io_uring::opcode::Fsync::new(fd) .build() .user_data(io.ring.get_key()); - io.ring.submit_entry(&sync, c); - Ok(()) + let c = Arc::new(c); + io.ring.submit_entry(&sync, c.clone()); + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/memory.rs b/core/io/memory.rs index 415da8ca3..1eafe9feb 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -83,18 +83,19 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result> { + let c = Arc::new(c); let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { c.complete(0); - return Ok(()); + return Ok(c); } let file_size = self.size.get(); if pos >= file_size { c.complete(0); - return Ok(()); + return Ok(c); } let read_len = buf_len.min(file_size - pos); @@ -121,15 +122,16 @@ impl File for MemoryFile { } } c.complete(read_len as i32); - Ok(()) + Ok(c) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result> { + let c = Arc::new(c); let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { c.complete(0); - return Ok(()); + return Ok(c); } let mut offset = pos; @@ -157,13 +159,13 @@ impl File for MemoryFile { .set(core::cmp::max(pos + buf_len, self.size.get())); c.complete(buf_len as i32); - Ok(()) + Ok(c) } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: Completion) -> Result> { // no-op c.complete(0); - Ok(()) + Ok(Arc::new(c)) } fn size(&self) -> Result { diff --git a/core/io/mod.rs b/core/io/mod.rs index 0985a8070..7e503caf4 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,9 +14,9 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Arc) -> Result<()>; - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()>; - fn sync(&self, c: Arc) -> Result<()>; + fn pread(&self, pos: usize, c: Completion) -> Result>; + fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result>; + fn sync(&self, c: Completion) -> Result>; fn size(&self) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index 6772d6e45..76dfe3c05 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -333,19 +333,20 @@ impl File for UnixFile<'_> { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result> { let file = self.file.borrow(); let result = { let r = c.as_read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; + let c = Arc::new(c); match result { Ok(n) => { trace!("pread n: {}", n); // Read succeeded immediately c.complete(0); - Ok(()) + Ok(c) } Err(Errno::AGAIN) => { trace!("pread blocks"); @@ -356,27 +357,33 @@ impl File for UnixFile<'_> { { self.callbacks.insert( fd as usize, - CompletionCallback::Read(self.file.clone(), c, pos), + CompletionCallback::Read(self.file.clone(), c.clone(), pos), ); } - Ok(()) + Ok(c) } Err(e) => Err(e.into()), } } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { + fn pwrite( + &self, + pos: usize, + buffer: Arc>, + c: Completion, + ) -> Result> { let file = self.file.borrow(); let result = { let buf = buffer.borrow(); rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64) }; + let c = Arc::new(c); match result { Ok(n) => { trace!("pwrite n: {}", n); // Read succeeded immediately c.complete(n as i32); - Ok(()) + Ok(c) } Err(Errno::AGAIN) => { trace!("pwrite blocks"); @@ -386,22 +393,23 @@ impl File for UnixFile<'_> { .add(&file.as_fd(), Event::readable(fd as usize))?; self.callbacks.insert( fd as usize, - CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), + CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos), ); - Ok(()) + Ok(c) } Err(e) => Err(e.into()), } } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: Completion) -> Result> { let file = self.file.borrow(); let result = fs::fsync(file.as_fd()); + let c = Arc::new(c); match result { Ok(()) => { trace!("fsync"); c.complete(0); - Ok(()) + Ok(c) } Err(e) => Err(e.into()), } diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 9838bbcbf..a659afdb4 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -98,7 +98,7 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result> { let r = match c.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -113,11 +113,16 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pread failed".to_string())) } else { c.complete(result); - Ok(()) + Ok(Arc::new(c)) } } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { + fn pwrite( + &self, + pos: usize, + buffer: Arc>, + c: Completion, + ) -> Result> { let buf = buffer.borrow(); let count = buf.as_slice().len(); if self.vfs.is_null() { @@ -137,18 +142,18 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pwrite failed".to_string())) } else { c.complete(result); - Ok(()) + Ok(Arc::new(c)) } } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: Completion) -> Result> { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { Err(LimboError::ExtensionError("sync failed".to_string())) } else { c.complete(0); - Ok(()) + Ok(Arc::new(c)) } } diff --git a/core/io/windows.rs b/core/io/windows.rs index dfa4dc241..cadad3599 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -81,7 +81,7 @@ impl File for WindowsFile { unimplemented!() } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread(&self, pos: usize, c: Completion) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { @@ -91,24 +91,29 @@ impl File for WindowsFile { file.read_exact(buf)?; } c.complete(0); - Ok(()) + Ok(Arc::new(c)) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { + fn pwrite( + &self, + pos: usize, + buffer: Arc>, + c: Completion, + ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buffer.borrow().len() as i32); - Ok(()) + Ok(Arc::new(c)) } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, c: Completion) -> Result> { let file = self.file.borrow_mut(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); - Ok(()) + Ok(Arc::new(c)) } fn size(&self) -> Result { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index c1cd8be79..08f7e9a33 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7449,7 +7449,7 @@ mod tests { #[allow(clippy::arc_with_non_send_sync)] pager .db_file - .write_page(current_page as usize, buf.clone(), Arc::new(c))?; + .write_page(current_page as usize, buf.clone(), c)?; pager.io.run_once()?; let page = cursor.read_page(current_page as usize)?; diff --git a/core/storage/database.rs b/core/storage/database.rs index e524f131b..6886d2e63 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -9,14 +9,14 @@ use std::{cell::RefCell, sync::Arc}; /// the storage medium. A database can either be a file on disk, like in SQLite, /// or something like a remote page server service. pub trait DatabaseStorage: Send + Sync { - fn read_page(&self, page_idx: usize, c: Arc) -> Result<()>; + fn read_page(&self, page_idx: usize, c: Completion) -> Result<()>; fn write_page( &self, page_idx: usize, buffer: Arc>, - c: Arc, + c: Completion, ) -> Result<()>; - fn sync(&self, c: Arc) -> Result<()>; + fn sync(&self, c: Completion) -> Result<()>; fn size(&self) -> Result; } @@ -32,7 +32,7 @@ unsafe impl Sync for DatabaseFile {} #[cfg(feature = "fs")] impl DatabaseStorage for DatabaseFile { - fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { + fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -48,7 +48,7 @@ impl DatabaseStorage for DatabaseFile { &self, page_idx: usize, buffer: Arc>, - c: Arc, + c: Completion, ) -> Result<()> { let buffer_size = buffer.borrow().len(); assert!(page_idx > 0); @@ -60,8 +60,13 @@ impl DatabaseStorage for DatabaseFile { Ok(()) } - fn sync(&self, c: Arc) -> Result<()> { - self.file.sync(c) + fn sync(&self, c: Completion) -> Result<()> { + let _ = self.file.sync(c)?; + Ok(()) + } + + fn size(&self) -> Result { + self.file.size() } fn size(&self) -> Result { @@ -84,7 +89,7 @@ unsafe impl Send for FileMemoryStorage {} unsafe impl Sync for FileMemoryStorage {} impl DatabaseStorage for FileMemoryStorage { - fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { + fn read_page(&self, page_idx: usize, c: Completion) -> Result<()> { let r = match c.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -103,7 +108,7 @@ impl DatabaseStorage for FileMemoryStorage { &self, page_idx: usize, buffer: Arc>, - c: Arc, + c: Completion, ) -> Result<()> { let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); @@ -114,8 +119,9 @@ impl DatabaseStorage for FileMemoryStorage { Ok(()) } - fn sync(&self, c: Arc) -> Result<()> { - self.file.sync(c) + fn sync(&self, c: Completion) -> Result<()> { + let _ = self.file.sync(c)?; + Ok(()) } fn size(&self) -> Result { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 440ec7091..4fbd1d8ac 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -748,7 +748,7 @@ pub fn begin_read_page( } }); let c = Completion::new(CompletionType::Read(ReadCompletion::new(buf, complete))); - db_file.read_page(page_idx, Arc::new(c))?; + db_file.read_page(page_idx, c)?; Ok(()) } @@ -806,7 +806,7 @@ pub fn begin_write_btree_page( }) }; let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - page_source.write_page(page_id, buffer.clone(), Arc::new(c))?; + page_source.write_page(page_id, buffer.clone(), c)?; Ok(()) } @@ -819,7 +819,7 @@ pub fn begin_sync(db_file: Arc, syncing: Rc>) }), })); #[allow(clippy::arc_with_non_send_sync)] - db_file.sync(Arc::new(completion))?; + db_file.sync(completion)?; Ok(()) } @@ -1455,7 +1455,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, header: &WalHeader) -> Result< }) }; #[allow(clippy::arc_with_non_send_sync)] - let c = Arc::new(Completion::new(CompletionType::Write( - WriteCompletion::new(write_complete), - ))); + let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); io.pwrite(0, buffer.clone(), c)?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index fafd05b6d..4f4502388 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -879,7 +879,7 @@ impl Wal for WalFile { }), })); let shared = self.get_shared(); - shared.file.sync(Arc::new(completion))?; + shared.file.sync(completion)?; self.sync_state.set(SyncState::Syncing); Ok(WalFsyncStatus::IO) }