diff --git a/core/error.rs b/core/error.rs index 368cdb21c..f82200895 100644 --- a/core/error.rs +++ b/core/error.rs @@ -120,6 +120,8 @@ pub enum CompletionError { UringIOError(&'static str), #[error("Completion was aborted")] Aborted, + #[error("Decryption failed for page={page_idx}")] + DecryptionError { page_idx: usize }, } #[macro_export] diff --git a/core/storage/database.rs b/core/storage/database.rs index 1cd1a2d42..9658c65bf 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -103,25 +103,28 @@ impl DatabaseStorage for DatabaseFile { let Ok((buf, bytes_read)) = res else { return; }; - if bytes_read > 0 { - match encryption_ctx.decrypt_page(buf.as_slice(), page_idx) { - Ok(decrypted_data) => { - let original_buf = original_c.as_read().buf(); - original_buf.as_mut_slice().copy_from_slice(&decrypted_data); - original_c.complete(bytes_read); - } - Err(_) => { - tracing::error!( - "Failed to decrypt page data for page_id={page_idx}" - ); - original_c.complete(-1); - } + assert!( + bytes_read > 0, + "Expected to read some data on success for page_id={page_idx}" + ); + match encryption_ctx.decrypt_page(buf.as_slice(), page_idx) { + Ok(decrypted_data) => { + let original_buf = original_c.as_read().buf(); + original_buf.as_mut_slice().copy_from_slice(&decrypted_data); + original_c.complete(bytes_read); + } + Err(e) => { + tracing::error!( + "Failed to decrypt page data for page_id={page_idx}: {e}" + ); + assert!( + !original_c.has_error(), + "Original completion already has an error" + ); + original_c.error(CompletionError::DecryptionError { page_idx }); } - } else { - original_c.complete(bytes_read); } }); - let new_completion = Completion::new_read(read_buffer, decrypt_complete); self.file.pread(pos, new_completion) } else { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index b3b0c8836..9019ee212 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -928,7 +928,7 @@ pub fn begin_read_page( #[instrument(skip_all, level = Level::INFO)] pub fn finish_read_page(page_idx: usize, buffer_ref: Arc, page: PageRef) { - tracing::trace!(page_idx); + tracing::trace!("finish_read_page(page_idx = {page_idx})"); let pos = if page_idx == DatabaseHeader::PAGE_ID { DatabaseHeader::SIZE } else { @@ -1851,14 +1851,52 @@ pub fn begin_read_wal_frame( offset: usize, buffer_pool: Arc, complete: Box, + page_idx: usize, + io_ctx: &IOContext, ) -> Result { - tracing::trace!("begin_read_wal_frame(offset={})", offset); + tracing::trace!( + "begin_read_wal_frame(offset={}, page_idx={})", + offset, + page_idx + ); let buf = buffer_pool.get_page(); let buf = Arc::new(buf); - #[allow(clippy::arc_with_non_send_sync)] - let c = Completion::new_read(buf, complete); - let c = io.pread(offset, c)?; - Ok(c) + + if let Some(ctx) = io_ctx.encryption_context() { + let encryption_ctx = ctx.clone(); + let original_complete = complete; + + let decrypt_complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { + let Ok((encrypted_buf, bytes_read)) = res else { + original_complete(res); + return; + }; + assert!( + bytes_read > 0, + "Expected to read some data on success for page_idx={page_idx}" + ); + match encryption_ctx.decrypt_page(encrypted_buf.as_slice(), page_idx) { + Ok(decrypted_data) => { + encrypted_buf + .as_mut_slice() + .copy_from_slice(&decrypted_data); + original_complete(Ok((encrypted_buf, bytes_read))); + } + Err(e) => { + tracing::error!( + "Failed to decrypt WAL frame data for page_idx={page_idx}: {e}" + ); + original_complete(Err(CompletionError::DecryptionError { page_idx })); + } + } + }); + + let new_completion = Completion::new_read(buf, decrypt_complete); + io.pread(offset, new_completion) + } else { + let c = Completion::new_read(buf, complete); + io.pread(offset, c) + } } pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 3f1d09571..f49039b38 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1064,10 +1064,6 @@ impl Wal for WalFile { page.set_locked(); let frame = page.clone(); let page_idx = page.get().id; - let encryption_ctx = { - let io_ctx = self.io_ctx.borrow(); - io_ctx.encryption_context().cloned() - }; let seq = self.header.checkpoint_seq; let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { let Ok((buf, bytes_read)) = res else { @@ -1080,17 +1076,6 @@ impl Wal for WalFile { "read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}" ); let cloned = frame.clone(); - if let Some(ctx) = encryption_ctx.clone() { - match ctx.decrypt_page(buf.as_slice(), page_idx) { - Ok(decrypted_data) => { - buf.as_mut_slice().copy_from_slice(&decrypted_data); - } - Err(_) => { - tracing::error!("Failed to decrypt page data for frame_id={frame_id}"); - return; - } - } - } finish_read_page(page.get().id, buf, cloned); frame.set_wal_tag(frame_id, seq); }); @@ -1099,6 +1084,8 @@ impl Wal for WalFile { offset + WAL_FRAME_HEADER_SIZE, buffer_pool, complete, + page_idx, + &self.io_ctx.borrow(), ) } @@ -1182,6 +1169,8 @@ impl Wal for WalFile { offset + WAL_FRAME_HEADER_SIZE, buffer_pool, complete, + page_id as usize, + &self.io_ctx.borrow(), )?; self.io.wait_for_completion(c)?; return if conflict.get() { @@ -2113,6 +2102,8 @@ impl WalFile { offset + WAL_FRAME_HEADER_SIZE, self.buffer_pool.clone(), complete, + page_id, + &self.io_ctx.borrow(), )?; Ok(InflightRead {