fix overflow range read

This commit is contained in:
Pere Diaz Bou
2024-09-13 21:34:45 +02:00
parent c69e7db3eb
commit 6f30b67ec4
2 changed files with 172 additions and 23 deletions

View File

@@ -118,7 +118,13 @@ impl BTreeCursor {
},
}
}
let cell = page.cell_get(mem_page.cell_idx(), self.pager.clone())?;
let cell = page.cell_get(
mem_page.cell_idx(),
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)?;
match &cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
@@ -167,7 +173,13 @@ impl BTreeCursor {
let page = page.as_ref().unwrap();
for cell_idx in 0..page.cell_count() {
match &page.cell_get(cell_idx, self.pager.clone())? {
match &page.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)? {
BTreeCell::TableLeafCell(TableLeafCell {
_rowid: cell_rowid,
_payload: p,
@@ -271,7 +283,13 @@ impl BTreeCursor {
let mut found_cell = false;
for cell_idx in 0..page.cell_count() {
match &page.cell_get(cell_idx, self.pager.clone())? {
match &page.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)? {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
_rowid,
@@ -471,7 +489,12 @@ impl BTreeCursor {
}
fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) {
let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx);
let (cell_start, cell_len) = page.cell_get_raw_region(
cell_idx,
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
);
self.free_cell_range(page, cell_start as u16, cell_len as u16);
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
}
@@ -535,7 +558,12 @@ impl BTreeCursor {
let mut scratch_cells: Vec<&[u8]> = Vec::new();
for cell_idx in 0..page_copy.cell_count() {
let (start, len) = page_copy.cell_get_raw_region(cell_idx);
let (start, len) = page_copy.cell_get_raw_region(
cell_idx,
self.max_local(page_copy.page_type()),
self.min_local(page_copy.page_type()),
self.usable_space(),
);
let buf = page_copy.as_ptr();
scratch_cells.push(&buf[start..start + len]);
}
@@ -565,6 +593,7 @@ impl BTreeCursor {
let right_page = right_page.as_mut().unwrap();
{
let is_leaf = page.is_leaf();
let page_type = page.page_type();
let mut new_pages = vec![page, right_page];
let new_pages_ids = vec![mem_page.page_idx, right_page_id];
trace!(
@@ -593,7 +622,15 @@ impl BTreeCursor {
// Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value
let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST;
for cell_idx in 0..parent.cell_count() {
let cell = parent.cell_get(cell_idx, self.pager.clone()).unwrap();
let cell = parent
.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page_type.clone()),
self.min_local(page_type.clone()),
self.usable_space(),
)
.unwrap();
let found = match cell {
BTreeCell::TableInteriorCell(interior) => {
interior._left_child_page as usize == mem_page.page_idx
@@ -601,7 +638,12 @@ impl BTreeCursor {
_ => unreachable!("Parent should always be a "),
};
if found {
let (start, len) = parent.cell_get_raw_region(cell_idx);
let (start, len) = parent.cell_get_raw_region(
cell_idx,
self.max_local(page_type.clone()),
self.min_local(page_type.clone()),
self.usable_space(),
);
right_pointer = start;
break;
}
@@ -651,7 +693,13 @@ impl BTreeCursor {
for page in new_pages.iter_mut().take(new_pages_len - 1) {
assert!(page.cell_count() == 1);
let last_cell = page
.cell_get(page.cell_count() - 1, self.pager.clone())
.cell_get(
page.cell_count() - 1,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)
.unwrap();
let last_cell_pointer = match last_cell {
BTreeCell::TableInteriorCell(interior) => interior._left_child_page,
@@ -674,9 +722,17 @@ impl BTreeCursor {
assert!(page.cell_count() > 1);
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell =
read_btree_cell(cell_payload, &page.page_type(), 0, self.pager.clone())
.unwrap();
let cell = read_btree_cell(
cell_payload,
&page.page_type(),
0,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)
.unwrap();
if is_leaf {
// create a new divider cell and push
let key = match cell {
@@ -1116,7 +1172,16 @@ impl BTreeCursor {
let mut cell_idx = 0;
let cell_count = page.cell_count();
while cell_idx < cell_count {
match page.cell_get(cell_idx, self.pager.clone()).unwrap() {
match page
.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)
.unwrap()
{
BTreeCell::TableLeafCell(cell) => {
if int_key <= cell._rowid {
break;
@@ -1292,7 +1357,13 @@ impl Cursor for BTreeCursor {
if cell_idx >= page.cell_count() {
Ok(CursorResult::Ok(false))
} else {
let equals = match &page.cell_get(cell_idx, self.pager.clone())? {
let equals = match &page.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(page.page_type()),
self.min_local(page.page_type()),
self.usable_space(),
)? {
BTreeCell::TableLeafCell(l) => l._rowid == int_key,
_ => unreachable!(),
};

View File

@@ -357,7 +357,14 @@ impl PageContent {
}
}
pub fn cell_get(&self, idx: usize, pager: Rc<Pager>) -> Result<BTreeCell> {
pub fn cell_get(
&self,
idx: usize,
pager: Rc<Pager>,
max_local: usize,
min_local: usize,
usable_size: usize,
) -> Result<BTreeCell> {
let buf = self.as_ptr();
let ncells = self.cell_count();
@@ -371,7 +378,15 @@ impl PageContent {
let cell_pointer = cell_start + (idx * 2);
let cell_pointer = self.read_u16(cell_pointer) as usize;
read_btree_cell(buf, &self.page_type(), cell_pointer, pager)
read_btree_cell(
buf,
&self.page_type(),
cell_pointer,
pager,
max_local,
min_local,
usable_size,
)
}
pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) {
@@ -385,7 +400,13 @@ impl PageContent {
}
/* Get region of a cell's payload */
pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) {
pub fn cell_get_raw_region(
&self,
idx: usize,
max_local: usize,
min_local: usize,
usable_size: usize,
) -> (usize, usize) {
let buf = self.as_ptr();
let ncells = self.cell_count();
let cell_start = match self.page_type() {
@@ -401,7 +422,13 @@ impl PageContent {
let len = match self.page_type() {
PageType::IndexInterior => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer + 4..]).unwrap();
4 + len_payload as usize + n_payload + 4
let (overflows, to_read) =
payload_overflows(len_payload as usize, max_local, min_local, usable_size);
if overflows {
4 + to_read + n_payload + 4
} else {
4 + len_payload as usize + n_payload + 4
}
}
PageType::TableInterior => {
let (_, n_rowid) = read_varint(&buf[cell_pointer + 4..]).unwrap();
@@ -409,13 +436,24 @@ impl PageContent {
}
PageType::IndexLeaf => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
len_payload as usize + n_payload + 4
let (overflows, to_read) =
payload_overflows(len_payload as usize, max_local, min_local, usable_size);
if overflows {
to_read as usize + n_payload + 4
} else {
len_payload as usize + n_payload + 4
}
}
PageType::TableLeaf => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap();
// TODO: add overflow 4 bytes
len_payload as usize + n_payload + n_rowid
let (overflows, to_read) =
payload_overflows(len_payload as usize, max_local, min_local, usable_size);
if overflows {
to_read + n_payload + n_rowid
} else {
len_payload as usize + n_payload + n_rowid
}
}
};
(start, len)
@@ -553,6 +591,9 @@ pub fn read_btree_cell(
page_type: &PageType,
pos: usize,
pager: Rc<Pager>,
max_local: usize,
min_local: usize,
usable_size: usize,
) -> Result<BTreeCell> {
match page_type {
PageType::IndexInterior => {
@@ -562,8 +603,13 @@ pub fn read_btree_cell(
pos += 4;
let (payload_size, nr) = read_varint(&page[pos..])?;
pos += nr;
let (overflows, to_read) =
payload_overflows(payload_size as usize, max_local, min_local, usable_size);
let to_read = if overflows { to_read } else { page.len() - pos };
let (payload, first_overflow_page) =
read_payload(&page[pos..], payload_size as usize, pager);
read_payload(&page[pos..pos + to_read], payload_size as usize, pager);
Ok(BTreeCell::IndexInteriorCell(IndexInteriorCell {
left_child_page,
payload,
@@ -585,8 +631,13 @@ pub fn read_btree_cell(
let mut pos = pos;
let (payload_size, nr) = read_varint(&page[pos..])?;
pos += nr;
let (overflows, to_read) =
payload_overflows(payload_size as usize, max_local, min_local, usable_size);
let to_read = if overflows { to_read } else { page.len() - pos };
let (payload, first_overflow_page) =
read_payload(&page[pos..], payload_size as usize, pager);
read_payload(&page[pos..pos + to_read], payload_size as usize, pager);
Ok(BTreeCell::IndexLeafCell(IndexLeafCell {
payload,
first_overflow_page,
@@ -598,8 +649,13 @@ pub fn read_btree_cell(
pos += nr;
let (rowid, nr) = read_varint(&page[pos..])?;
pos += nr;
let (overflows, to_read) =
payload_overflows(payload_size as usize, max_local, min_local, usable_size);
let to_read = if overflows { to_read } else { page.len() - pos };
let (payload, first_overflow_page) =
read_payload(&page[pos..], payload_size as usize, pager);
read_payload(&page[pos..pos + to_read], payload_size as usize, pager);
Ok(BTreeCell::TableLeafCell(TableLeafCell {
_rowid: rowid,
_payload: payload,
@@ -938,6 +994,28 @@ fn finish_read_wal_frame_header(
Ok(())
}
/*
Checks if payload will overflow a cell based on max local and
it will return the min size that will be stored in that case,
including overflow pointer
*/
pub fn payload_overflows(
payload_size: usize,
max_local: usize,
min_local: usize,
usable_size: usize,
) -> (bool, usize) {
if payload_size <= max_local {
return (false, 0);
}
let mut space_left = min_local + (payload_size - min_local) % (usable_size - 4);
if space_left > max_local {
space_left = min_local;
}
return (true, space_left + 4);
}
#[cfg(test)]
mod tests {
use super::*;