Merge 'Propagate decryption error from the callback' from Avinash Sajjanshetty

Reviewed-by: Pedro Muniz (@pedrocarlo)

Closes #2843
This commit is contained in:
Pekka Enberg
2025-08-30 08:40:47 +03:00
committed by GitHub
4 changed files with 71 additions and 37 deletions

View File

@@ -120,6 +120,8 @@ pub enum CompletionError {
UringIOError(&'static str),
#[error("Completion was aborted")]
Aborted,
#[error("Decryption failed for page={page_idx}")]
DecryptionError { page_idx: usize },
}
#[macro_export]

View File

@@ -103,25 +103,28 @@ impl DatabaseStorage for DatabaseFile {
let Ok((buf, bytes_read)) = res else {
return;
};
if bytes_read > 0 {
match encryption_ctx.decrypt_page(buf.as_slice(), page_idx) {
Ok(decrypted_data) => {
let original_buf = original_c.as_read().buf();
original_buf.as_mut_slice().copy_from_slice(&decrypted_data);
original_c.complete(bytes_read);
}
Err(_) => {
tracing::error!(
"Failed to decrypt page data for page_id={page_idx}"
);
original_c.complete(-1);
}
assert!(
bytes_read > 0,
"Expected to read some data on success for page_id={page_idx}"
);
match encryption_ctx.decrypt_page(buf.as_slice(), page_idx) {
Ok(decrypted_data) => {
let original_buf = original_c.as_read().buf();
original_buf.as_mut_slice().copy_from_slice(&decrypted_data);
original_c.complete(bytes_read);
}
Err(e) => {
tracing::error!(
"Failed to decrypt page data for page_id={page_idx}: {e}"
);
assert!(
!original_c.has_error(),
"Original completion already has an error"
);
original_c.error(CompletionError::DecryptionError { page_idx });
}
} else {
original_c.complete(bytes_read);
}
});
let new_completion = Completion::new_read(read_buffer, decrypt_complete);
self.file.pread(pos, new_completion)
} else {

View File

@@ -928,7 +928,7 @@ pub fn begin_read_page(
#[instrument(skip_all, level = Level::INFO)]
pub fn finish_read_page(page_idx: usize, buffer_ref: Arc<Buffer>, page: PageRef) {
tracing::trace!(page_idx);
tracing::trace!("finish_read_page(page_idx = {page_idx})");
let pos = if page_idx == DatabaseHeader::PAGE_ID {
DatabaseHeader::SIZE
} else {
@@ -1851,14 +1851,52 @@ pub fn begin_read_wal_frame(
offset: usize,
buffer_pool: Arc<BufferPool>,
complete: Box<ReadComplete>,
page_idx: usize,
io_ctx: &IOContext,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame(offset={})", offset);
tracing::trace!(
"begin_read_wal_frame(offset={}, page_idx={})",
offset,
page_idx
);
let buf = buffer_pool.get_page();
let buf = Arc::new(buf);
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
Ok(c)
if let Some(ctx) = io_ctx.encryption_context() {
let encryption_ctx = ctx.clone();
let original_complete = complete;
let decrypt_complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((encrypted_buf, bytes_read)) = res else {
original_complete(res);
return;
};
assert!(
bytes_read > 0,
"Expected to read some data on success for page_idx={page_idx}"
);
match encryption_ctx.decrypt_page(encrypted_buf.as_slice(), page_idx) {
Ok(decrypted_data) => {
encrypted_buf
.as_mut_slice()
.copy_from_slice(&decrypted_data);
original_complete(Ok((encrypted_buf, bytes_read)));
}
Err(e) => {
tracing::error!(
"Failed to decrypt WAL frame data for page_idx={page_idx}: {e}"
);
original_complete(Err(CompletionError::DecryptionError { page_idx }));
}
}
});
let new_completion = Completion::new_read(buf, decrypt_complete);
io.pread(offset, new_completion)
} else {
let c = Completion::new_read(buf, complete);
io.pread(offset, c)
}
}
pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) {

View File

@@ -1064,10 +1064,6 @@ impl Wal for WalFile {
page.set_locked();
let frame = page.clone();
let page_idx = page.get().id;
let encryption_ctx = {
let io_ctx = self.io_ctx.borrow();
io_ctx.encryption_context().cloned()
};
let seq = self.header.checkpoint_seq;
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
@@ -1080,17 +1076,6 @@ impl Wal for WalFile {
"read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}"
);
let cloned = frame.clone();
if let Some(ctx) = encryption_ctx.clone() {
match ctx.decrypt_page(buf.as_slice(), page_idx) {
Ok(decrypted_data) => {
buf.as_mut_slice().copy_from_slice(&decrypted_data);
}
Err(_) => {
tracing::error!("Failed to decrypt page data for frame_id={frame_id}");
return;
}
}
}
finish_read_page(page.get().id, buf, cloned);
frame.set_wal_tag(frame_id, seq);
});
@@ -1099,6 +1084,8 @@ impl Wal for WalFile {
offset + WAL_FRAME_HEADER_SIZE,
buffer_pool,
complete,
page_idx,
&self.io_ctx.borrow(),
)
}
@@ -1182,6 +1169,8 @@ impl Wal for WalFile {
offset + WAL_FRAME_HEADER_SIZE,
buffer_pool,
complete,
page_id as usize,
&self.io_ctx.borrow(),
)?;
self.io.wait_for_completion(c)?;
return if conflict.get() {
@@ -2113,6 +2102,8 @@ impl WalFile {
offset + WAL_FRAME_HEADER_SIZE,
self.buffer_pool.clone(),
complete,
page_id,
&self.io_ctx.borrow(),
)?;
Ok(InflightRead {