From 88ff2188107038babc50de9ffe727367c2b5bb89 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 14 Jul 2025 10:19:41 +0200 Subject: [PATCH] io: assert small I/O Let's assert **for now** that we do not read/write less bytes than expected. This should be fixed to retrigger several reads/writes if we couldn't read/write enough but for now let's assert. --- core/io/mod.rs | 8 +++---- core/io/unix.rs | 5 ++-- core/storage/sqlite3_ondisk.rs | 43 +++++++++++++++++++++------------- core/storage/wal.rs | 16 ++++++++++--- simulator/runner/file.rs | 6 ++--- 5 files changed, 50 insertions(+), 28 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 35c6b8f95..5ba5d2a27 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -54,7 +54,7 @@ pub trait IO: Clock + Send + Sync { fn get_memory_io(&self) -> Arc; } -pub type Complete = dyn Fn(Arc>); +pub type Complete = dyn Fn(Arc>, i32); pub type WriteComplete = dyn Fn(i32); pub type SyncComplete = dyn Fn(i32); @@ -88,7 +88,7 @@ impl Completion { pub fn complete(&self, result: i32) { match &self.completion_type { - CompletionType::Read(r) => r.complete(), + CompletionType::Read(r) => r.complete(result), CompletionType::Write(w) => w.complete(result), CompletionType::Sync(s) => s.complete(result), // fix }; @@ -126,8 +126,8 @@ impl ReadCompletion { self.buf.borrow_mut() } - pub fn complete(&self) { - (self.complete)(self.buf.clone()); + pub fn complete(&self, bytes_read: i32) { + (self.complete)(self.buf.clone(), bytes_read); } } diff --git a/core/io/unix.rs b/core/io/unix.rs index cfafdc6cb..a06923fd7 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -1,6 +1,6 @@ -use crate::error::LimboError; use crate::io::common; use crate::Result; +use crate::{error::LimboError, turso_assert}; use super::{Completion, File, MemoryIO, OpenFlags, IO}; use crate::io::clock::{Clock, Instant}; @@ -345,9 +345,10 @@ impl File for UnixFile<'_> { let c = Arc::new(c); match result { Ok(n) => { + let r = c.as_read(); trace!("pread n: {}", n); // Read succeeded immediately - c.complete(0); + c.complete(n as i32); Ok(c) } Err(Errno::AGAIN) => { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index b2b545d36..5b6566d98 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -61,7 +61,7 @@ use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; -use crate::{File, Result, WalFileShared}; +use crate::{turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; use std::mem::MaybeUninit; @@ -732,7 +732,12 @@ pub fn begin_read_page( }); #[allow(clippy::arc_with_non_send_sync)] let buf = Arc::new(RefCell::new(Buffer::new(buf, drop_fn))); - let complete = Box::new(move |buf: Arc>| { + let complete = Box::new(move |buf: Arc>, bytes_read: i32| { + let buf_len = buf.borrow().len(); + turso_assert!( + bytes_read == buf_len as i32, + "read({bytes_read}) less than expected({buf_len})" + ); let page = page.clone(); if finish_read_page(page_idx, buf, page.clone()).is_err() { page.set_error(); @@ -793,9 +798,10 @@ pub fn begin_write_btree_page( *clone_counter.borrow_mut() -= 1; page_finish.clear_dirty(); - if bytes_written < buf_len as i32 { - tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); - } + turso_assert!( + bytes_written == buf_len as i32, + "wrote({bytes_written}) less than expected({buf_len})" + ); }) }; let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); @@ -1330,9 +1336,14 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result = Box::new(move |buf: Arc>| { + let complete: Box = Box::new(move |buf: Arc>, bytes_read: i32| { let buf = buf.borrow(); let buf_slice = buf.as_slice(); + turso_assert!( + bytes_read == buf_slice.len() as i32, + "read({bytes_read}) less than expected({})", + buf_slice.len() + ); let mut header_locked = header.lock(); // Read header header_locked.magic = @@ -1489,7 +1500,7 @@ pub fn begin_read_wal_frame( io: &Arc, offset: usize, buffer_pool: Arc, - complete: Box>)>, + complete: Box>, i32)>, ) -> Result> { tracing::trace!("begin_read_wal_frame(offset={})", offset); let buf = buffer_pool.get(); @@ -1586,9 +1597,10 @@ pub fn begin_write_wal_frame( *clone_counter.borrow_mut() -= 1; page_finish.clear_dirty(); - if bytes_written < buf_len as i32 { - tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); - } + turso_assert!( + bytes_written == buf_len as i32, + "wrote({bytes_written}) less than expected({buf_len})" + ); }) }; #[allow(clippy::arc_with_non_send_sync)] @@ -1608,7 +1620,7 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< let buffer = { let drop_fn = Rc::new(|_buf| {}); - let mut buffer = Buffer::allocate(512, drop_fn); + let mut buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic.to_be_bytes()); @@ -1626,11 +1638,10 @@ pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result< let write_complete = { Box::new(move |bytes_written: i32| { - if bytes_written < WAL_HEADER_SIZE as i32 { - tracing::error!( - "wal header wrote({bytes_written}) less than expected({WAL_HEADER_SIZE})" - ); - } + turso_assert!( + bytes_written == WAL_HEADER_SIZE as i32, + "wal header wrote({bytes_written}) less than expected({WAL_HEADER_SIZE})" + ); }) }; #[allow(clippy::arc_with_non_send_sync)] diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 1836022fc..c7436fa5c 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -23,7 +23,7 @@ use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::{Buffer, LimboError, Result}; +use crate::{turso_assert, Buffer, LimboError, Result}; use crate::{Completion, Page}; use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -618,7 +618,12 @@ impl Wal for WalFile { let offset = self.frame_offset(frame_id); page.set_locked(); let frame = page.clone(); - let complete = Box::new(move |buf: Arc>| { + let complete = Box::new(move |buf: Arc>, bytes_read: i32| { + let buf_len = buf.borrow().len(); + turso_assert!( + bytes_read == buf_len as i32, + "read({bytes_read}) less than expected({buf_len})" + ); let frame = frame.clone(); finish_read_page(page.get().id, buf, frame).unwrap(); }); @@ -641,8 +646,13 @@ impl Wal for WalFile { ) -> Result> { tracing::debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); - let complete = Box::new(move |buf: Arc>| { + let complete = Box::new(move |buf: Arc>, bytes_read: i32| { let buf = buf.borrow(); + let buf_len = buf.len(); + turso_assert!( + bytes_read == buf_len as i32, + "read({bytes_read}) less than expected({buf_len})" + ); let buf_ptr = buf.as_ptr(); unsafe { std::ptr::copy_nonoverlapping(buf_ptr, frame, frame_len as usize); diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index c25c374eb..fbd80ea88 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -123,13 +123,13 @@ impl File for SimulatorFile { unreachable!(); }; let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); + let dummy_complete = Box::new(|_, _| {}); let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete); - let new_complete = move |res| { + let new_complete = move |res, bytes_read| { if before { std::thread::sleep(latency); } - (prev_complete)(res); + (prev_complete)(res, bytes_read); if !before { std::thread::sleep(latency); }