From 9c75872827b2d57f4328d43a5e46c78832bf89a7 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 18 Jul 2025 14:46:19 -0400 Subject: [PATCH] Make writing wal header able to be sync --- bindings/javascript/src/lib.rs | 9 ++++++--- bindings/wasm/lib.rs | 18 +++++++++--------- core/io/generic.rs | 2 +- core/io/io_uring.rs | 2 +- core/io/memory.rs | 2 +- core/io/mod.rs | 2 +- core/io/unix.rs | 4 ++-- core/io/vfs.rs | 2 +- core/io/windows.rs | 2 +- core/storage/database.rs | 12 +++++------- core/storage/sqlite3_ondisk.rs | 3 ++- core/storage/wal.rs | 32 ++++++++++++++++---------------- simulator/runner/file.rs | 6 +----- 13 files changed, 47 insertions(+), 49 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 7a8ed46c1..470bdfbc0 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -726,9 +726,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { fn size(&self) -> turso_core::Result { self.file.size() } - fn truncate(&self, len: usize, c: Arc) -> turso_core::Result<()> { - let _ = self.file.truncate(len, c)?; - Ok(()) + fn truncate( + &self, + len: usize, + c: turso_core::Completion, + ) -> turso_core::Result { + self.file.truncate(len, c) } } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 2e9e49984..1996d073f 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -216,8 +216,8 @@ impl turso_core::File for File { fn pread( &self, pos: usize, - c: Arc, - ) -> Result> { + c: turso_core::Completion, + ) -> Result { let r = match c.completion_type { turso_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -236,8 +236,8 @@ impl turso_core::File for File { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: turso_core::Completion, + ) -> Result { let w = match c.completion_type { turso_core::CompletionType::Write(ref w) => w, _ => unreachable!(), @@ -250,7 +250,7 @@ impl turso_core::File for File { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: turso_core::Completion) -> Result { self.vfs.sync(self.fd); c.complete(0); #[allow(clippy::arc_with_non_send_sync)] @@ -264,8 +264,8 @@ impl turso_core::File for File { fn truncate( &self, len: usize, - c: Arc, - ) -> Result> { + c: turso_core::Completion, + ) -> Result { self.vfs.truncate(self.fd, len); c.complete(0); #[allow(clippy::arc_with_non_send_sync)] @@ -305,7 +305,7 @@ impl turso_core::IO for PlatformIO { })) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -391,7 +391,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { self.file.size() } - fn truncate(&self, len: usize, c: Arc) -> Result<()> { + fn truncate(&self, len: usize, c: turso_core::Completion) -> Result<()> { self.file.truncate(len, c)?; Ok(()) } diff --git a/core/io/generic.rs b/core/io/generic.rs index cb70ea068..5bfda1db0 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -121,7 +121,7 @@ impl File for GenericFile { Ok(c) } - fn truncate(&self, len: usize, c: Arc) -> Result> { + fn truncate(&self, len: usize, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.set_len(len as u64) .map_err(|err| LimboError::IOError(err))?; diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 4b02aa516..fe3e8d6dd 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -354,7 +354,7 @@ impl File for UringFile { Ok(self.file.metadata()?.len()) } - fn truncate(&self, len: usize, c: Arc) -> Result> { + fn truncate(&self, len: usize, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let truncate = with_fd!(self, |fd| { io_uring::opcode::Ftruncate::new(fd, len as u64) diff --git a/core/io/memory.rs b/core/io/memory.rs index df9161453..7dbf05d50 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -174,7 +174,7 @@ impl File for MemoryFile { Ok(c) } - fn truncate(&self, len: usize, c: Arc) -> Result> { + fn truncate(&self, len: usize, c: Completion) -> Result { if len < self.size.get() { // Truncate pages unsafe { diff --git a/core/io/mod.rs b/core/io/mod.rs index c1b2e8ac7..82ef51313 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -19,7 +19,7 @@ pub trait File: Send + Sync { -> Result; fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; - fn truncate(&self, len: usize, c: Arc) -> Result>; + fn truncate(&self, len: usize, c: Completion) -> Result; } #[derive(Debug, Copy, Clone, PartialEq)] diff --git a/core/io/unix.rs b/core/io/unix.rs index 0d388044b..cf7245b88 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -452,8 +452,8 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn truncate(&self, len: usize, c: Arc) -> Result> { - let file = self.file.borrow(); + fn truncate(&self, len: usize, c: Completion) -> Result { + let file = self.file.lock().unwrap(); let result = file.set_len(len as u64); match result { Ok(()) => { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 25a52263a..beb535aff 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -166,7 +166,7 @@ impl File for VfsFileImpl { } } - fn truncate(&self, len: usize, c: Arc) -> Result> { + fn truncate(&self, len: usize, c: Completion) -> Result { if self.vfs.is_null() { return Err(LimboError::ExtensionError("VFS is null".to_string())); } diff --git a/core/io/windows.rs b/core/io/windows.rs index 2ae910b9b..b7c5b06cc 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -123,7 +123,7 @@ impl File for WindowsFile { } #[instrument(err, skip_all, level = Level::TRACE)] - fn truncate(&self, len: usize, c: Arc) -> Result> { + fn truncate(&self, len: usize, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.set_len(len as u64).map_err(LimboError::IOError)?; c.complete(0); diff --git a/core/storage/database.rs b/core/storage/database.rs index c7560f8fe..1ff74c95e 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -18,7 +18,7 @@ pub trait DatabaseStorage: Send + Sync { ) -> Result; fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; - fn truncate(&self, len: usize, c: Arc) -> Result<()>; + fn truncate(&self, len: usize, c: Completion) -> Result; } #[cfg(feature = "fs")] @@ -72,9 +72,8 @@ impl DatabaseStorage for DatabaseFile { } #[instrument(skip_all, level = Level::INFO)] - fn truncate(&self, len: usize, c: Arc) -> Result<()> { - self.file.truncate(len, c)?; - Ok(()) + fn truncate(&self, len: usize, c: Completion) -> Result { + self.file.truncate(len, c) } } @@ -131,9 +130,8 @@ impl DatabaseStorage for FileMemoryStorage { } #[instrument(skip_all, level = Level::INFO)] - fn truncate(&self, len: usize, c: Arc) -> Result<()> { - let _ = self.file.truncate(len, c)?; - Ok(()) + fn truncate(&self, len: usize, c: Completion) -> Result { + self.file.truncate(len, c) } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 46cb738b4..829f049b6 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1695,7 +1695,8 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< }; #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_write(write_complete); - io.pwrite(0, buffer.clone(), c) + let c = io.pwrite(0, buffer.clone(), c.clone())?; + Ok(c) } /// Checks if payload will overflow a cell based on the maximum allowed size. diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 9cb997f9f..d99724c95 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1101,10 +1101,14 @@ impl Wal for WalFile { let shared = shared.clone(); let shared = unsafe { shared.get().as_mut().unwrap() }; let io = io.clone(); - let _ = shared.restart_wal_header(&io); + shared.restart_wal_header(&io).unwrap(); }); let shared = self.get_shared(); - shared.file.truncate(WAL_HEADER_SIZE, c.into())?; + let c_cloned = c.clone(); + let c = shared.file.truncate(WAL_HEADER_SIZE, c_cloned.clone())?; + // ensure that the header is written and not just this completion fires + self.io.wait_for_completion(c_cloned)?; + self.io.wait_for_completion(c)?; Ok(()) } } @@ -1260,19 +1264,14 @@ impl WalFileShared { let c = sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; // TODO: for now wait for completion io.wait_for_completion(c)?; - let header = Arc::new(SpinLock::new(wal_header)); - let checksum = { - let checksum = header.lock(); - (checksum.checksum_1, checksum.checksum_2) - }; - tracing::debug!("new_shared(header={:?})", header); + tracing::debug!("new_shared(header={:?})", wal_header); let shared = WalFileShared { - wal_header: header, + wal_header: Arc::new(SpinLock::new(wal_header)), min_frame: AtomicU64::new(0), max_frame: AtomicU64::new(0), nbackfills: AtomicU64::new(0), frame_cache: Arc::new(SpinLock::new(HashMap::new())), - last_checksum: checksum, + last_checksum: (checksums.0, checksums.1), file, pages_in_frames: Arc::new(SpinLock::new(Vec::new())), read_locks: array::from_fn(|_| LimboRwLock { @@ -1326,16 +1325,17 @@ impl WalFileShared { hdr.salt_2 = io.generate_random_number() as u32; // rewrite header on disk - sqlite3_ondisk::begin_write_wal_header(&self.file, &hdr)?; + let c = sqlite3_ondisk::begin_write_wal_header(&self.file, &hdr)?; + // TODO: for now wait for completion + io.wait_for_completion(c)?; - // clear per‑page caches self.frame_cache.lock().clear(); self.pages_in_frames.lock().clear(); self.last_checksum = (hdr.checksum_1, hdr.checksum_2); // reset read‑marks - self.read_locks[0].value.store(0, Ordering::SeqCst); // always 0 - self.read_locks[1].value.store(0, Ordering::SeqCst); // available + self.read_locks[0].value.store(0, Ordering::SeqCst); + self.read_locks[1].value.store(0, Ordering::SeqCst); for lock in &self.read_locks[2..] { lock.value.store(READMARK_NOT_USED, Ordering::SeqCst); } @@ -1377,10 +1377,10 @@ pub mod test { let _done = done.clone(); let _ = file.file.truncate( WAL_HEADER_SIZE, - Arc::new(Completion::new_trunc(move |_| { + Completion::new_trunc(move |_| { let done = _done.clone(); done.set(true); - })), + }), ); assert!(file.file.size().unwrap() == WAL_HEADER_SIZE as u64); assert!(done.get()); diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 928b51d5a..ba3680333 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -226,11 +226,7 @@ impl File for SimulatorFile { self.inner.size() } - fn truncate( - &self, - len: usize, - c: Arc, - ) -> Result> { + fn truncate(&self, len: usize, c: turso_core::Completion) -> Result { if self.fault.get() { return Err(turso_core::LimboError::InternalError( FAULT_ERROR_MSG.into(),