Extend read_write_payload_with_offset to handle offset into

overflow pages.
This commit is contained in:
krishvishal
2025-05-13 15:28:58 +05:30
parent cffb731e4c
commit 93ad65e512

View File

@@ -201,13 +201,20 @@ enum ReadPayloadOverflow {
}
enum PayloadOverflowWithOffset {
ProcessPage {
payload: Vec<u8>,
SkipOverflowPages {
next_page: u32,
remaining_to_read: usize,
pages_left_to_skip: u32,
page_offset: u32,
amount: u32,
buffer_offset: usize,
is_write: bool,
},
ProcessPage {
next_page: u32,
remaining_to_read: u32,
page: PageRef,
current_offset: usize,
buffset_offset: usize,
buffer_offset: usize,
is_write: bool,
},
}
@@ -298,6 +305,7 @@ impl WriteInfo {
enum CursorState {
None,
Read(ReadPayloadOverflow),
ReadWritePayload(PayloadOverflowWithOffset),
Write(WriteInfo),
Destroy(DestroyInfo),
Delete(DeleteInfo),
@@ -779,6 +787,15 @@ impl BTreeCursor {
mut amount: u32,
is_write: bool,
) -> Result<CursorResult<()>> {
if let CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
..
})
| CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { .. }) =
&self.state
{
return self.continue_payload_overflow_with_offset(buffer, self.usable_space());
}
let page = self.stack.top();
return_if_locked_maybe_load!(self.pager, page);
@@ -824,20 +841,178 @@ impl BTreeCursor {
local_size, offset, amount
);
println!("cell");
let mut bytes_processed: u32 = 0;
if offset < local_size as u32 {
let mut a: u32 = amount;
if a + offset > local_size as u32 {
a = local_size as u32 - offset;
let mut local_amount: u32 = amount;
if local_amount + offset > local_size as u32 {
local_amount = local_size as u32 - offset;
}
self.read_write_payload_to_page(offset, payload, page, buffer, amount, is_write);
offset = 0;
amount -= a;
amount -= local_amount;
bytes_processed += local_amount;
} else {
offset -= local_size as u32;
}
if amount > 0 {
if first_overflow_page.is_none() {
return Err(LimboError::Corrupt(
"Expected overflow page but none found".into(),
));
}
let overflow_size = usable_size - 4;
let pages_to_skip = offset / overflow_size as u32;
let page_offset = offset % overflow_size as u32;
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
next_page: first_overflow_page.unwrap(),
pages_left_to_skip: pages_to_skip,
page_offset: page_offset,
amount: amount,
buffer_offset: bytes_processed as usize,
is_write,
});
return Ok(CursorResult::IO);
}
Ok(CursorResult::Ok(()))
}
pub fn continue_payload_overflow_with_offset(
&mut self,
buffer: &mut Vec<u8>,
usable_space: usize,
) -> Result<CursorResult<()>> {
loop {
let mut state = std::mem::replace(&mut self.state, CursorState::None);
match &mut state {
CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
next_page,
pages_left_to_skip,
page_offset,
amount,
buffer_offset,
is_write,
}) => {
if *pages_left_to_skip == 0 {
let page = self.pager.read_page(*next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
next_page: *next_page,
remaining_to_read: *amount,
page: page,
current_offset: *page_offset as usize,
buffer_offset: *buffer_offset,
is_write: *is_write,
});
continue;
}
let page = self.pager.read_page(*next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
let contents = page.get_contents();
let next = contents.read_u32_no_offset(0);
if next == 0 {
return Err(LimboError::Corrupt(
"Overflow chain ends prematurely".into(),
));
}
*next_page = next;
*pages_left_to_skip -= 1;
self.state = CursorState::ReadWritePayload(
PayloadOverflowWithOffset::SkipOverflowPages {
next_page: next,
pages_left_to_skip: *pages_left_to_skip,
page_offset: *page_offset,
amount: *amount,
buffer_offset: *buffer_offset,
is_write: *is_write,
},
);
return Ok(CursorResult::IO);
}
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
next_page,
remaining_to_read,
page,
current_offset,
buffer_offset,
is_write,
}) => {
if page.is_locked() {
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
next_page: *next_page,
remaining_to_read: *remaining_to_read,
page: page.clone(),
current_offset: *current_offset,
buffer_offset: *buffer_offset,
is_write: *is_write,
});
return Ok(CursorResult::IO);
}
let contents = page.get_contents();
let overflow_size = usable_space - 4;
let page_offset = *current_offset;
let bytes_to_process = std::cmp::min(
*remaining_to_read,
overflow_size as u32 - page_offset as u32,
);
let payload_offset = 4 + page_offset;
let page_payload = contents.as_ptr();
self.read_write_payload_to_page(
payload_offset as u32,
page_payload,
page.clone(),
buffer,
bytes_to_process,
*is_write,
);
*remaining_to_read -= bytes_to_process;
*buffer_offset += bytes_to_process as usize;
if *remaining_to_read == 0 {
self.state = CursorState::None;
return Ok(CursorResult::Ok(()));
}
let next = contents.read_u32_no_offset(0);
if next == 0 {
return Err(LimboError::Corrupt(
"Overflow chain ends prematurely".into(),
));
}
// Load next page
*next_page = next;
*current_offset = 0; // Reset offset for new page
*page = self.pager.read_page(next as usize)?;
// Return IO to allow other operations
return Ok(CursorResult::IO);
}
_ => {
return Err(LimboError::InternalError(
"Invalid state for continue_payload_overflow_with_offset".into(),
))
}
}
}
}
fn read_write_payload_to_page(
&mut self,
payload_offset: u32,