diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 7c572ede4..0f29dee3e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -201,13 +201,20 @@ enum ReadPayloadOverflow { } enum PayloadOverflowWithOffset { - ProcessPage { - payload: Vec, + SkipOverflowPages { next_page: u32, - remaining_to_read: usize, + pages_left_to_skip: u32, + page_offset: u32, + amount: u32, + buffer_offset: usize, + is_write: bool, + }, + ProcessPage { + next_page: u32, + remaining_to_read: u32, page: PageRef, current_offset: usize, - buffset_offset: usize, + buffer_offset: usize, is_write: bool, }, } @@ -298,6 +305,7 @@ impl WriteInfo { enum CursorState { None, Read(ReadPayloadOverflow), + ReadWritePayload(PayloadOverflowWithOffset), Write(WriteInfo), Destroy(DestroyInfo), Delete(DeleteInfo), @@ -779,6 +787,15 @@ impl BTreeCursor { mut amount: u32, is_write: bool, ) -> Result> { + if let CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { + .. + }) + | CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { .. }) = + &self.state + { + return self.continue_payload_overflow_with_offset(buffer, self.usable_space()); + } + let page = self.stack.top(); return_if_locked_maybe_load!(self.pager, page); @@ -824,20 +841,178 @@ impl BTreeCursor { local_size, offset, amount ); println!("cell"); + let mut bytes_processed: u32 = 0; if offset < local_size as u32 { - let mut a: u32 = amount; - if a + offset > local_size as u32 { - a = local_size as u32 - offset; + let mut local_amount: u32 = amount; + if local_amount + offset > local_size as u32 { + local_amount = local_size as u32 - offset; } self.read_write_payload_to_page(offset, payload, page, buffer, amount, is_write); offset = 0; - amount -= a; + amount -= local_amount; + bytes_processed += local_amount; } else { offset -= local_size as u32; } + + if amount > 0 { + if first_overflow_page.is_none() { + return Err(LimboError::Corrupt( + "Expected overflow page but none found".into(), + )); + } + + let overflow_size = usable_size - 4; + let pages_to_skip = offset / overflow_size as u32; + let page_offset = offset % overflow_size as u32; + + self.state = + CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { + next_page: first_overflow_page.unwrap(), + pages_left_to_skip: pages_to_skip, + page_offset: page_offset, + amount: amount, + buffer_offset: bytes_processed as usize, + is_write, + }); + + return Ok(CursorResult::IO); + } Ok(CursorResult::Ok(())) } + pub fn continue_payload_overflow_with_offset( + &mut self, + buffer: &mut Vec, + usable_space: usize, + ) -> Result> { + loop { + let mut state = std::mem::replace(&mut self.state, CursorState::None); + + match &mut state { + CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { + next_page, + pages_left_to_skip, + page_offset, + amount, + buffer_offset, + is_write, + }) => { + if *pages_left_to_skip == 0 { + let page = self.pager.read_page(*next_page as usize)?; + return_if_locked_maybe_load!(self.pager, page); + self.state = + CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { + next_page: *next_page, + remaining_to_read: *amount, + page: page, + current_offset: *page_offset as usize, + buffer_offset: *buffer_offset, + is_write: *is_write, + }); + + continue; + } + + let page = self.pager.read_page(*next_page as usize)?; + return_if_locked_maybe_load!(self.pager, page); + let contents = page.get_contents(); + let next = contents.read_u32_no_offset(0); + + if next == 0 { + return Err(LimboError::Corrupt( + "Overflow chain ends prematurely".into(), + )); + } + *next_page = next; + *pages_left_to_skip -= 1; + + self.state = CursorState::ReadWritePayload( + PayloadOverflowWithOffset::SkipOverflowPages { + next_page: next, + pages_left_to_skip: *pages_left_to_skip, + page_offset: *page_offset, + amount: *amount, + buffer_offset: *buffer_offset, + is_write: *is_write, + }, + ); + + return Ok(CursorResult::IO); + } + + CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { + next_page, + remaining_to_read, + page, + current_offset, + buffer_offset, + is_write, + }) => { + if page.is_locked() { + self.state = + CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { + next_page: *next_page, + remaining_to_read: *remaining_to_read, + page: page.clone(), + current_offset: *current_offset, + buffer_offset: *buffer_offset, + is_write: *is_write, + }); + + return Ok(CursorResult::IO); + } + + let contents = page.get_contents(); + let overflow_size = usable_space - 4; + + let page_offset = *current_offset; + let bytes_to_process = std::cmp::min( + *remaining_to_read, + overflow_size as u32 - page_offset as u32, + ); + + let payload_offset = 4 + page_offset; + let page_payload = contents.as_ptr(); + self.read_write_payload_to_page( + payload_offset as u32, + page_payload, + page.clone(), + buffer, + bytes_to_process, + *is_write, + ); + *remaining_to_read -= bytes_to_process; + *buffer_offset += bytes_to_process as usize; + + if *remaining_to_read == 0 { + self.state = CursorState::None; + return Ok(CursorResult::Ok(())); + } + let next = contents.read_u32_no_offset(0); + if next == 0 { + return Err(LimboError::Corrupt( + "Overflow chain ends prematurely".into(), + )); + } + + // Load next page + *next_page = next; + *current_offset = 0; // Reset offset for new page + *page = self.pager.read_page(next as usize)?; + + // Return IO to allow other operations + return Ok(CursorResult::IO); + } + _ => { + return Err(LimboError::InternalError( + "Invalid state for continue_payload_overflow_with_offset".into(), + )) + } + } + } + } + fn read_write_payload_to_page( &mut self, payload_offset: u32,