diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 1602749f4..99712bd62 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -584,8 +584,8 @@ impl DatabaseFile { impl limbo_core::DatabaseStorage for DatabaseFile { fn read_page(&self, page_idx: usize, c: Arc) -> limbo_core::Result<()> { - let r = match *c { - limbo_core::Completion::Read(ref r) => r, + let r = match c.completion_type { + limbo_core::CompletionType::Read(ref r) => r, _ => unreachable!(), }; let size = r.buf().len(); diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index adbae3afa..e1c18edc2 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -212,8 +212,8 @@ impl limbo_core::File for File { } fn pread(&self, pos: usize, c: Arc) -> Result<()> { - let r = match *c { - limbo_core::Completion::Read(ref r) => r, + let r = match c.completion_type { + limbo_core::CompletionType::Read(ref r) => r, _ => unreachable!(), }; { @@ -232,8 +232,8 @@ impl limbo_core::File for File { buffer: Arc>, c: Arc, ) -> Result<()> { - let w = match *c { - limbo_core::Completion::Write(ref w) => w, + let w = match c.completion_type { + limbo_core::CompletionType::Write(ref w) => w, _ => unreachable!(), }; let buf = buffer.borrow(); @@ -337,8 +337,8 @@ impl DatabaseFile { impl limbo_core::DatabaseStorage for DatabaseFile { fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { - let r = match *c { - limbo_core::Completion::Read(ref r) => r, + let r = match c.completion_type { + limbo_core::CompletionType::Read(ref r) => r, _ => unreachable!(), }; let size = r.buf().len(); diff --git a/core/io/generic.rs b/core/io/generic.rs index 11dfe9971..aaaf2e305 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -1,5 +1,5 @@ use super::MemoryIO; -use crate::{Clock, Completion, File, Instant, LimboError, OpenFlags, Result, IO}; +use crate::{Clock, Completion, CompletionType, File, Instant, LimboError, OpenFlags, Result, IO}; use std::cell::RefCell; use std::io::{Read, Seek, Write}; use std::sync::Arc; @@ -90,8 +90,8 @@ impl File for GenericFile { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match *c { - Completion::Read(ref r) => r, + let r = match c.completion_type { + CompletionType::Read(ref r) => r, _ => unreachable!(), }; let mut buf = r.buf_mut(); @@ -102,7 +102,12 @@ impl File for GenericFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { + fn pwrite( + &self, + pos: usize, + buffer: Arc>, + c: Arc, + ) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 5f0045888..307a7a2f2 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -2,6 +2,7 @@ use super::{common, Completion, File, OpenFlags, WriteCompletion, IO}; use crate::io::clock::{Clock, Instant}; +use crate::io::CompletionType; use crate::{LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use rustix::io_uring::iovec; @@ -291,7 +292,12 @@ impl File for UringFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { + fn pwrite( + &self, + pos: usize, + buffer: Arc>, + c: Arc, + ) -> Result<()> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { @@ -305,11 +311,13 @@ impl File for UringFile { }; io.ring.submit_entry( &write, - Arc::new(Completion::Write(WriteCompletion::new(Box::new(move |result| { - c.complete(result); - // NOTE: Explicitly reference buffer to ensure it lives until here - let _ = buffer.borrow(); - })))), + Arc::new(Completion::new(CompletionType::Write( + WriteCompletion::new(Box::new(move |result| { + c.complete(result); + // NOTE: Explicitly reference buffer to ensure it lives until here + let _ = buffer.borrow(); + })), + ))), ); Ok(()) } diff --git a/core/io/mod.rs b/core/io/mod.rs index 73fec5246..0985a8070 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -53,7 +53,12 @@ pub type Complete = dyn Fn(Arc>); pub type WriteComplete = dyn Fn(i32); pub type SyncComplete = dyn Fn(i32); -pub enum Completion { +pub struct Completion { + pub completion_type: CompletionType, + is_completed: Cell, +} + +pub enum CompletionType { Read(ReadCompletion), Write(WriteCompletion), Sync(SyncCompletion), @@ -62,31 +67,34 @@ pub enum Completion { pub struct ReadCompletion { pub buf: Arc>, pub complete: Box, - pub is_completed: Cell, } impl Completion { - pub fn is_completed(&self) -> bool { - match self { - Self::Read(r) => r.is_completed.get(), - Self::Write(w) => w.is_completed.get(), - Self::Sync(s) => s.is_completed.get(), + pub fn new(completion_type: CompletionType) -> Self { + Self { + completion_type, + is_completed: Cell::new(false), } } + pub fn is_completed(&self) -> bool { + self.is_completed.get() + } + pub fn complete(&self, result: i32) { - match self { - Self::Read(r) => r.complete(), - Self::Write(w) => w.complete(result), - Self::Sync(s) => s.complete(result), // fix - } + match &self.completion_type { + CompletionType::Read(r) => r.complete(), + CompletionType::Write(w) => w.complete(result), + CompletionType::Sync(s) => s.complete(result), // fix + }; + self.is_completed.set(true); } /// only call this method if you are sure that the completion is /// a ReadCompletion, panics otherwise pub fn as_read(&self) -> &ReadCompletion { - match self { - Self::Read(ref r) => r, + match self.completion_type { + CompletionType::Read(ref r) => r, _ => unreachable!(), } } @@ -94,21 +102,15 @@ impl Completion { pub struct WriteCompletion { pub complete: Box, - pub is_completed: Cell, } pub struct SyncCompletion { pub complete: Box, - pub is_completed: Cell, } impl ReadCompletion { pub fn new(buf: Arc>, complete: Box) -> Self { - Self { - buf, - complete, - is_completed: Cell::new(false), - } + Self { buf, complete } } pub fn buf(&self) -> Ref<'_, Buffer> { @@ -121,35 +123,26 @@ impl ReadCompletion { pub fn complete(&self) { (self.complete)(self.buf.clone()); - self.is_completed.set(true); } } impl WriteCompletion { pub fn new(complete: Box) -> Self { - Self { - complete, - is_completed: Cell::new(false), - } + Self { complete } } pub fn complete(&self, bytes_written: i32) { (self.complete)(bytes_written); - self.is_completed.set(true); } } impl SyncCompletion { pub fn new(complete: Box) -> Self { - Self { - complete, - is_completed: Cell::new(false), - } + Self { complete } } pub fn complete(&self, res: i32) { (self.complete)(res); - self.is_completed.set(true); } } diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 7aae691e6..9838bbcbf 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -1,6 +1,7 @@ use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO}; use crate::ext::VfsMod; use crate::io::clock::{Clock, Instant}; +use crate::io::CompletionType; use crate::{LimboError, Result}; use limbo_ext::{VfsFileImpl, VfsImpl}; use std::cell::RefCell; @@ -98,8 +99,8 @@ impl File for VfsFileImpl { } fn pread(&self, pos: usize, c: Arc) -> Result<()> { - let r = match &*c { - Completion::Read(ref r) => r, + let r = match c.completion_type { + CompletionType::Read(ref r) => r, _ => unreachable!(), }; let result = { diff --git a/core/lib.rs b/core/lib.rs index 4c0bd22f1..bd85aa47f 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -46,7 +46,8 @@ pub use io::UnixIO; #[cfg(all(feature = "fs", target_os = "linux", feature = "io_uring"))] pub use io::UringIO; pub use io::{ - Buffer, Completion, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO, + Buffer, Completion, CompletionType, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, + WriteCompletion, IO, }; use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser}; use parking_lot::RwLock; diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 68f0189b7..c1cd8be79 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6518,7 +6518,7 @@ mod tests { use super::*; use crate::{ - io::{Buffer, Completion, MemoryIO, OpenFlags, IO}, + io::{Buffer, Completion, CompletionType, MemoryIO, OpenFlags, IO}, storage::{database::DatabaseFile, page_cache::DumbLruPageCache}, types::Text, vdbe::Register, @@ -7445,7 +7445,7 @@ mod tests { drop_fn, ))); let write_complete = Box::new(|_| {}); - let c = Completion::Write(WriteCompletion::new(write_complete)); + let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); #[allow(clippy::arc_with_non_send_sync)] pager .db_file diff --git a/core/storage/database.rs b/core/storage/database.rs index 2f3e7c9b5..e524f131b 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -1,4 +1,5 @@ use crate::error::LimboError; +use crate::io::CompletionType; use crate::{io::Completion, Buffer, Result}; use std::{cell::RefCell, sync::Arc}; @@ -84,8 +85,8 @@ unsafe impl Sync for FileMemoryStorage {} impl DatabaseStorage for FileMemoryStorage { fn read_page(&self, page_idx: usize, c: Arc) -> Result<()> { - let r = match *c { - Completion::Read(ref r) => r, + let r = match c.completion_type { + CompletionType::Read(ref r) => r, _ => unreachable!(), }; let size = r.buf().len(); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d1562f285..440ec7091 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -47,7 +47,9 @@ use tracing::{instrument, Level}; use crate::error::LimboError; use crate::fast_lock::SpinLock; -use crate::io::{Buffer, Complete, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; +use crate::io::{ + Buffer, Complete, Completion, CompletionType, ReadCompletion, SyncCompletion, WriteCompletion, +}; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; @@ -55,7 +57,7 @@ use crate::types::{ ImmutableRecord, RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype, }; use crate::{File, Result, WalFileShared}; -use std::cell::{Cell, RefCell, UnsafeCell}; +use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; use std::mem::MaybeUninit; use std::pin::Pin; @@ -745,7 +747,7 @@ pub fn begin_read_page( page.set_error(); } }); - let c = Completion::Read(ReadCompletion::new(buf, complete)); + let c = Completion::new(CompletionType::Read(ReadCompletion::new(buf, complete))); db_file.read_page(page_idx, Arc::new(c))?; Ok(()) } @@ -803,7 +805,7 @@ pub fn begin_write_btree_page( } }) }; - let c = Completion::Write(WriteCompletion::new(write_complete)); + let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); page_source.write_page(page_id, buffer.clone(), Arc::new(c))?; Ok(()) } @@ -811,12 +813,11 @@ pub fn begin_write_btree_page( pub fn begin_sync(db_file: Arc, syncing: Rc>) -> Result<()> { assert!(!*syncing.borrow()); *syncing.borrow_mut() = true; - let completion = Completion::Sync(SyncCompletion { + let completion = Completion::new(CompletionType::Sync(SyncCompletion { complete: Box::new(move |_| { *syncing.borrow_mut() = false; }), - is_completed: Cell::new(false), - }); + })); #[allow(clippy::arc_with_non_send_sync)] db_file.sync(Arc::new(completion))?; Ok(()) @@ -1450,7 +1451,10 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, header: &WalHeader) -> Result< }) }; #[allow(clippy::arc_with_non_send_sync)] - let c = Arc::new(Completion::Write(WriteCompletion::new(write_complete))); + let c = Arc::new(Completion::new(CompletionType::Write( + WriteCompletion::new(write_complete), + ))); io.pwrite(0, buffer.clone(), c)?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index a67a0ddae..fafd05b6d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -17,7 +17,7 @@ use std::{ }; use crate::fast_lock::SpinLock; -use crate::io::{File, SyncCompletion, IO}; +use crate::io::{CompletionType, File, SyncCompletion, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE, @@ -872,13 +872,12 @@ impl Wal for WalFile { tracing::debug!("wal_sync"); let syncing = self.syncing.clone(); self.syncing.set(true); - let completion = Completion::Sync(SyncCompletion { + let completion = Completion::new(CompletionType::Sync(SyncCompletion { complete: Box::new(move |_| { tracing::debug!("wal_sync finish"); syncing.set(false); }), - is_completed: Cell::new(false), - }); + })); let shared = self.get_shared(); shared.file.sync(Arc::new(completion))?; self.sync_state.set(SyncState::Syncing);