diff --git a/core/storage/database.rs b/core/storage/database.rs index 3b2dbbdf0..9658c65bf 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -117,9 +117,8 @@ impl DatabaseStorage for DatabaseFile { tracing::error!( "Failed to decrypt page data for page_id={page_idx}: {e}" ); - assert_eq!( - original_c.has_error(), - false, + assert!( + !original_c.has_error(), "Original completion already has an error" ); original_c.error(CompletionError::DecryptionError { page_idx }); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index b3b0c8836..9019ee212 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -928,7 +928,7 @@ pub fn begin_read_page( #[instrument(skip_all, level = Level::INFO)] pub fn finish_read_page(page_idx: usize, buffer_ref: Arc, page: PageRef) { - tracing::trace!(page_idx); + tracing::trace!("finish_read_page(page_idx = {page_idx})"); let pos = if page_idx == DatabaseHeader::PAGE_ID { DatabaseHeader::SIZE } else { @@ -1851,14 +1851,52 @@ pub fn begin_read_wal_frame( offset: usize, buffer_pool: Arc, complete: Box, + page_idx: usize, + io_ctx: &IOContext, ) -> Result { - tracing::trace!("begin_read_wal_frame(offset={})", offset); + tracing::trace!( + "begin_read_wal_frame(offset={}, page_idx={})", + offset, + page_idx + ); let buf = buffer_pool.get_page(); let buf = Arc::new(buf); - #[allow(clippy::arc_with_non_send_sync)] - let c = Completion::new_read(buf, complete); - let c = io.pread(offset, c)?; - Ok(c) + + if let Some(ctx) = io_ctx.encryption_context() { + let encryption_ctx = ctx.clone(); + let original_complete = complete; + + let decrypt_complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { + let Ok((encrypted_buf, bytes_read)) = res else { + original_complete(res); + return; + }; + assert!( + bytes_read > 0, + "Expected to read some data on success for page_idx={page_idx}" + ); + match encryption_ctx.decrypt_page(encrypted_buf.as_slice(), page_idx) { + Ok(decrypted_data) => { + encrypted_buf + .as_mut_slice() + .copy_from_slice(&decrypted_data); + original_complete(Ok((encrypted_buf, bytes_read))); + } + Err(e) => { + tracing::error!( + "Failed to decrypt WAL frame data for page_idx={page_idx}: {e}" + ); + original_complete(Err(CompletionError::DecryptionError { page_idx })); + } + } + }); + + let new_completion = Completion::new_read(buf, decrypt_complete); + io.pread(offset, new_completion) + } else { + let c = Completion::new_read(buf, complete); + io.pread(offset, c) + } } pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 3f1d09571..f49039b38 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1064,10 +1064,6 @@ impl Wal for WalFile { page.set_locked(); let frame = page.clone(); let page_idx = page.get().id; - let encryption_ctx = { - let io_ctx = self.io_ctx.borrow(); - io_ctx.encryption_context().cloned() - }; let seq = self.header.checkpoint_seq; let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { let Ok((buf, bytes_read)) = res else { @@ -1080,17 +1076,6 @@ impl Wal for WalFile { "read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}" ); let cloned = frame.clone(); - if let Some(ctx) = encryption_ctx.clone() { - match ctx.decrypt_page(buf.as_slice(), page_idx) { - Ok(decrypted_data) => { - buf.as_mut_slice().copy_from_slice(&decrypted_data); - } - Err(_) => { - tracing::error!("Failed to decrypt page data for frame_id={frame_id}"); - return; - } - } - } finish_read_page(page.get().id, buf, cloned); frame.set_wal_tag(frame_id, seq); }); @@ -1099,6 +1084,8 @@ impl Wal for WalFile { offset + WAL_FRAME_HEADER_SIZE, buffer_pool, complete, + page_idx, + &self.io_ctx.borrow(), ) } @@ -1182,6 +1169,8 @@ impl Wal for WalFile { offset + WAL_FRAME_HEADER_SIZE, buffer_pool, complete, + page_id as usize, + &self.io_ctx.borrow(), )?; self.io.wait_for_completion(c)?; return if conflict.get() { @@ -2113,6 +2102,8 @@ impl WalFile { offset + WAL_FRAME_HEADER_SIZE, self.buffer_pool.clone(), complete, + page_id, + &self.io_ctx.borrow(), )?; Ok(InflightRead {