From 83b214a4e3fa7187acc55313b00a6dc7ef310383 Mon Sep 17 00:00:00 2001 From: krishvishal Date: Sun, 26 Jan 2025 17:07:30 +0530 Subject: [PATCH] Added `clear_over_pages` it deletes all the overflow pages related to a cell. --- core/storage/btree.rs | 94 ++++++++++++++++++++++++++++++++-- core/storage/sqlite3_ondisk.rs | 2 +- 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 5f53112e1..d9aa4fa62 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6,7 +6,7 @@ use crate::storage::sqlite3_ondisk::{ TableInteriorCell, TableLeafCell, }; use crate::types::{CursorResult, OwnedRecord, OwnedValue, SeekKey, SeekOp}; -use crate::Result; +use crate::{LimboError, Result}; use std::cell::{Ref, RefCell}; use std::pin::Pin; @@ -14,7 +14,8 @@ use std::rc::Rc; use super::pager::PageRef; use super::sqlite3_ondisk::{ - write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE, + payload_overflows, write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, + DATABASE_HEADER_SIZE, }; /* @@ -739,7 +740,6 @@ impl BTreeCursor { // TODO: if overwrite drop cell // insert cell - let mut cell_payload: Vec = Vec::new(); self.fill_cell_payload(page_type, Some(int_key), &mut cell_payload, record); @@ -1899,6 +1899,94 @@ impl BTreeCursor { let id = page.get().id; id as u32 } + + fn clear_overflow_pages(&self, cell: &BTreeCell) -> Result> { + // Get overflow info based on cell type + let (first_overflow_page, n_overflow) = match cell { + BTreeCell::TableLeafCell(leaf_cell) => { + match self.calculate_overflow_info(leaf_cell._payload.len(), PageType::TableLeaf)? { + Some(n_overflow) => (leaf_cell.first_overflow_page, n_overflow), + None => return Ok(CursorResult::Ok(())), + } + } + BTreeCell::IndexLeafCell(leaf_cell) => { + match self.calculate_overflow_info(leaf_cell.payload.len(), PageType::IndexLeaf)? { + Some(n_overflow) => (leaf_cell.first_overflow_page, n_overflow), + None => return Ok(CursorResult::Ok(())), + } + } + BTreeCell::IndexInteriorCell(interior_cell) => { + match self + .calculate_overflow_info(interior_cell.payload.len(), PageType::IndexInterior)? + { + Some(n_overflow) => (interior_cell.first_overflow_page, n_overflow), + None => return Ok(CursorResult::Ok(())), + } + } + BTreeCell::TableInteriorCell(_) => return Ok(CursorResult::Ok(())), // No overflow pages + }; + + let Some(first_page) = first_overflow_page else { + return Ok(CursorResult::Ok(())); + }; + let page_count = self.database_header.borrow().database_size as usize; + let mut pages_left = n_overflow; + let mut current_page = first_page; + // Clear overflow pages + while pages_left > 0 { + pages_left -= 1; + + // Validate overflow page number + if current_page < 2 || current_page as usize > page_count { + return Err(LimboError::Corrupt("Invalid overflow page number".into())); + } + + let page = self.pager.read_page(current_page as usize)?; + return_if_locked!(page); + let contents = page.get().contents.as_ref().unwrap(); + + let next_page = if pages_left > 0 { + contents.read_u32(0) + } else { + 0 + }; + + // Free the current page + self.pager.free_page(Some(page), current_page as usize)?; + + current_page = next_page; + } + Ok(CursorResult::Ok(())) + } + + fn calculate_overflow_info( + &self, + payload_len: usize, + page_type: PageType, + ) -> Result> { + let max_local = self.payload_overflow_threshold_max(page_type.clone()); + let min_local = self.payload_overflow_threshold_min(page_type.clone()); + let usable_size = self.usable_space(); + + let (_, local_size) = payload_overflows(payload_len, max_local, min_local, usable_size); + + assert!( + local_size != payload_len, + "Trying to clear overflow pages when there are no overflow pages" + ); + + // Calculate expected overflow pages + let overflow_page_size = self.usable_space() - 4; + let n_overflow = + (payload_len - local_size + overflow_page_size).div_ceil(overflow_page_size); + if n_overflow == 0 { + return Err(LimboError::Corrupt("Invalid overflow calculation".into())); + } + + Ok(Some(n_overflow)) + } + + } impl PageStack { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index e86d89520..c5b8852ce 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -439,7 +439,7 @@ impl PageContent { u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]]) } - fn read_u32(&self, pos: usize) -> u32 { + pub fn read_u32(&self, pos: usize) -> u32 { let buf = self.as_ptr(); u32::from_be_bytes([ buf[self.offset + pos],