From 6f30b67ec41c57becc82c6942a9f90dda09d59ef Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 13 Sep 2024 21:34:45 +0200 Subject: [PATCH] fix overflow range read --- core/storage/btree.rs | 97 ++++++++++++++++++++++++++++----- core/storage/sqlite3_ondisk.rs | 98 ++++++++++++++++++++++++++++++---- 2 files changed, 172 insertions(+), 23 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 7d3af851a..f66933034 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -118,7 +118,13 @@ impl BTreeCursor { }, } } - let cell = page.cell_get(mem_page.cell_idx(), self.pager.clone())?; + let cell = page.cell_get( + mem_page.cell_idx(), + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )?; match &cell { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, @@ -167,7 +173,13 @@ impl BTreeCursor { let page = page.as_ref().unwrap(); for cell_idx in 0..page.cell_count() { - match &page.cell_get(cell_idx, self.pager.clone())? { + match &page.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )? { BTreeCell::TableLeafCell(TableLeafCell { _rowid: cell_rowid, _payload: p, @@ -271,7 +283,13 @@ impl BTreeCursor { let mut found_cell = false; for cell_idx in 0..page.cell_count() { - match &page.cell_get(cell_idx, self.pager.clone())? { + match &page.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )? { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, _rowid, @@ -471,7 +489,12 @@ impl BTreeCursor { } fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) { - let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx); + let (cell_start, cell_len) = page.cell_get_raw_region( + cell_idx, + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ); self.free_cell_range(page, cell_start as u16, cell_len as u16); page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); } @@ -535,7 +558,12 @@ impl BTreeCursor { let mut scratch_cells: Vec<&[u8]> = Vec::new(); for cell_idx in 0..page_copy.cell_count() { - let (start, len) = page_copy.cell_get_raw_region(cell_idx); + let (start, len) = page_copy.cell_get_raw_region( + cell_idx, + self.max_local(page_copy.page_type()), + self.min_local(page_copy.page_type()), + self.usable_space(), + ); let buf = page_copy.as_ptr(); scratch_cells.push(&buf[start..start + len]); } @@ -565,6 +593,7 @@ impl BTreeCursor { let right_page = right_page.as_mut().unwrap(); { let is_leaf = page.is_leaf(); + let page_type = page.page_type(); let mut new_pages = vec![page, right_page]; let new_pages_ids = vec![mem_page.page_idx, right_page_id]; trace!( @@ -593,7 +622,15 @@ impl BTreeCursor { // Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST; for cell_idx in 0..parent.cell_count() { - let cell = parent.cell_get(cell_idx, self.pager.clone()).unwrap(); + let cell = parent + .cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page_type.clone()), + self.min_local(page_type.clone()), + self.usable_space(), + ) + .unwrap(); let found = match cell { BTreeCell::TableInteriorCell(interior) => { interior._left_child_page as usize == mem_page.page_idx @@ -601,7 +638,12 @@ impl BTreeCursor { _ => unreachable!("Parent should always be a "), }; if found { - let (start, len) = parent.cell_get_raw_region(cell_idx); + let (start, len) = parent.cell_get_raw_region( + cell_idx, + self.max_local(page_type.clone()), + self.min_local(page_type.clone()), + self.usable_space(), + ); right_pointer = start; break; } @@ -651,7 +693,13 @@ impl BTreeCursor { for page in new_pages.iter_mut().take(new_pages_len - 1) { assert!(page.cell_count() == 1); let last_cell = page - .cell_get(page.cell_count() - 1, self.pager.clone()) + .cell_get( + page.cell_count() - 1, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ) .unwrap(); let last_cell_pointer = match last_cell { BTreeCell::TableInteriorCell(interior) => interior._left_child_page, @@ -674,9 +722,17 @@ impl BTreeCursor { assert!(page.cell_count() > 1); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; - let cell = - read_btree_cell(cell_payload, &page.page_type(), 0, self.pager.clone()) - .unwrap(); + let cell = read_btree_cell( + cell_payload, + &page.page_type(), + 0, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ) + .unwrap(); + if is_leaf { // create a new divider cell and push let key = match cell { @@ -1116,7 +1172,16 @@ impl BTreeCursor { let mut cell_idx = 0; let cell_count = page.cell_count(); while cell_idx < cell_count { - match page.cell_get(cell_idx, self.pager.clone()).unwrap() { + match page + .cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ) + .unwrap() + { BTreeCell::TableLeafCell(cell) => { if int_key <= cell._rowid { break; @@ -1292,7 +1357,13 @@ impl Cursor for BTreeCursor { if cell_idx >= page.cell_count() { Ok(CursorResult::Ok(false)) } else { - let equals = match &page.cell_get(cell_idx, self.pager.clone())? { + let equals = match &page.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )? { BTreeCell::TableLeafCell(l) => l._rowid == int_key, _ => unreachable!(), }; diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 7bbeea346..dc80339af 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -357,7 +357,14 @@ impl PageContent { } } - pub fn cell_get(&self, idx: usize, pager: Rc) -> Result { + pub fn cell_get( + &self, + idx: usize, + pager: Rc, + max_local: usize, + min_local: usize, + usable_size: usize, + ) -> Result { let buf = self.as_ptr(); let ncells = self.cell_count(); @@ -371,7 +378,15 @@ impl PageContent { let cell_pointer = cell_start + (idx * 2); let cell_pointer = self.read_u16(cell_pointer) as usize; - read_btree_cell(buf, &self.page_type(), cell_pointer, pager) + read_btree_cell( + buf, + &self.page_type(), + cell_pointer, + pager, + max_local, + min_local, + usable_size, + ) } pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) { @@ -385,7 +400,13 @@ impl PageContent { } /* Get region of a cell's payload */ - pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) { + pub fn cell_get_raw_region( + &self, + idx: usize, + max_local: usize, + min_local: usize, + usable_size: usize, + ) -> (usize, usize) { let buf = self.as_ptr(); let ncells = self.cell_count(); let cell_start = match self.page_type() { @@ -401,7 +422,13 @@ impl PageContent { let len = match self.page_type() { PageType::IndexInterior => { let (len_payload, n_payload) = read_varint(&buf[cell_pointer + 4..]).unwrap(); - 4 + len_payload as usize + n_payload + 4 + let (overflows, to_read) = + payload_overflows(len_payload as usize, max_local, min_local, usable_size); + if overflows { + 4 + to_read + n_payload + 4 + } else { + 4 + len_payload as usize + n_payload + 4 + } } PageType::TableInterior => { let (_, n_rowid) = read_varint(&buf[cell_pointer + 4..]).unwrap(); @@ -409,13 +436,24 @@ impl PageContent { } PageType::IndexLeaf => { let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap(); - len_payload as usize + n_payload + 4 + let (overflows, to_read) = + payload_overflows(len_payload as usize, max_local, min_local, usable_size); + if overflows { + to_read as usize + n_payload + 4 + } else { + len_payload as usize + n_payload + 4 + } } PageType::TableLeaf => { let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap(); let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap(); - // TODO: add overflow 4 bytes - len_payload as usize + n_payload + n_rowid + let (overflows, to_read) = + payload_overflows(len_payload as usize, max_local, min_local, usable_size); + if overflows { + to_read + n_payload + n_rowid + } else { + len_payload as usize + n_payload + n_rowid + } } }; (start, len) @@ -553,6 +591,9 @@ pub fn read_btree_cell( page_type: &PageType, pos: usize, pager: Rc, + max_local: usize, + min_local: usize, + usable_size: usize, ) -> Result { match page_type { PageType::IndexInterior => { @@ -562,8 +603,13 @@ pub fn read_btree_cell( pos += 4; let (payload_size, nr) = read_varint(&page[pos..])?; pos += nr; + + let (overflows, to_read) = + payload_overflows(payload_size as usize, max_local, min_local, usable_size); + let to_read = if overflows { to_read } else { page.len() - pos }; + let (payload, first_overflow_page) = - read_payload(&page[pos..], payload_size as usize, pager); + read_payload(&page[pos..pos + to_read], payload_size as usize, pager); Ok(BTreeCell::IndexInteriorCell(IndexInteriorCell { left_child_page, payload, @@ -585,8 +631,13 @@ pub fn read_btree_cell( let mut pos = pos; let (payload_size, nr) = read_varint(&page[pos..])?; pos += nr; + + let (overflows, to_read) = + payload_overflows(payload_size as usize, max_local, min_local, usable_size); + let to_read = if overflows { to_read } else { page.len() - pos }; + let (payload, first_overflow_page) = - read_payload(&page[pos..], payload_size as usize, pager); + read_payload(&page[pos..pos + to_read], payload_size as usize, pager); Ok(BTreeCell::IndexLeafCell(IndexLeafCell { payload, first_overflow_page, @@ -598,8 +649,13 @@ pub fn read_btree_cell( pos += nr; let (rowid, nr) = read_varint(&page[pos..])?; pos += nr; + + let (overflows, to_read) = + payload_overflows(payload_size as usize, max_local, min_local, usable_size); + let to_read = if overflows { to_read } else { page.len() - pos }; + let (payload, first_overflow_page) = - read_payload(&page[pos..], payload_size as usize, pager); + read_payload(&page[pos..pos + to_read], payload_size as usize, pager); Ok(BTreeCell::TableLeafCell(TableLeafCell { _rowid: rowid, _payload: payload, @@ -938,6 +994,28 @@ fn finish_read_wal_frame_header( Ok(()) } +/* + Checks if payload will overflow a cell based on max local and + it will return the min size that will be stored in that case, + including overflow pointer +*/ +pub fn payload_overflows( + payload_size: usize, + max_local: usize, + min_local: usize, + usable_size: usize, +) -> (bool, usize) { + if payload_size <= max_local { + return (false, 0); + } + + let mut space_left = min_local + (payload_size - min_local) % (usable_size - 4); + if space_left > max_local { + space_left = min_local; + } + return (true, space_left + 4); +} + #[cfg(test)] mod tests { use super::*;