From a9cb8157b548c3632ef40959bd6745ecc50a7eda Mon Sep 17 00:00:00 2001 From: gandeevanr Date: Wed, 7 Aug 2024 09:03:24 -0700 Subject: [PATCH 1/2] initial pass at implementing NewRowId --- core/pseudo.rs | 4 +++ core/storage/btree.rs | 68 ++++++++++++++++++++++++++++++++++++++++ core/translate/insert.rs | 2 +- core/types.rs | 1 + core/vdbe/explain.rs | 8 ++--- core/vdbe/mod.rs | 15 +++++++-- core/vdbe/sorter.rs | 4 +++ 7 files changed, 95 insertions(+), 7 deletions(-) diff --git a/core/pseudo.rs b/core/pseudo.rs index b05fa834f..7f4d94e08 100644 --- a/core/pseudo.rs +++ b/core/pseudo.rs @@ -50,6 +50,10 @@ impl Cursor for PseudoCursor { unimplemented!(); } + fn seek_to_last(&mut self) -> Result> { + unimplemented!(); + } + fn record(&self) -> Result>> { Ok(self.current.borrow()) } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index e1fe41990..0bf6f4559 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -72,6 +72,18 @@ impl BTreeCursor { } } + fn is_empty_table(&mut self) -> Result> { + let page = self.pager.read_page(self.root_page)?; + let page = RefCell::borrow(&page); + if page.is_locked() { + return Ok(CursorResult::IO); + } + + let page = page.contents.read().unwrap(); + let page = page.as_ref().unwrap(); + Ok(CursorResult::Ok(page.cell_count() == 0)) + } + fn get_next_record(&mut self) -> Result, Option)>> { loop { let mem_page = { @@ -184,6 +196,42 @@ impl BTreeCursor { .replace(Some(Rc::new(MemPage::new(None, self.root_page, 0)))); } + fn move_to_rightmost(&mut self) -> Result> { + self.move_to_root(); + + loop { + let mem_page = self.page.borrow().as_ref().unwrap().clone(); + let page_idx = mem_page.page_idx; + let page = self.pager.read_page(page_idx)?; + let page = RefCell::borrow(&page); + if page.is_locked() { + return Ok(CursorResult::IO); + } + let page = page.contents.read().unwrap(); + let page = page.as_ref().unwrap(); + if page.is_leaf() { + if page.cell_count() > 0 { + mem_page.cell_idx.replace(page.cell_count()-1); + } + return Ok(CursorResult::Ok(())); + } + + match page.rightmost_pointer() { + Some(right_most_pointer) => { + mem_page.cell_idx.replace(page.cell_count()); + let mem_page = + MemPage::new(Some(mem_page.clone()), right_most_pointer as usize, 0); + self.page.replace(Some(Rc::new(mem_page))); + continue; + }, + + None => { + unreachable!("interior page should have a rightmost pointer"); + } + } + } + } + pub fn move_to(&mut self, key: u64) -> Result> { // For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers. // B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data. @@ -827,6 +875,26 @@ fn find_free_cell(page_ref: &PageContent, db_header: Ref, amount } impl Cursor for BTreeCursor { + fn seek_to_last(&mut self) -> Result> { + self.move_to_rightmost()?; + match self.get_next_record()? { + CursorResult::Ok((rowid, next)) => { + if rowid.is_none() { + match self.is_empty_table()? { + CursorResult::Ok(is_empty) => { + assert!(is_empty) + }, + CursorResult::IO => (), + } + } + self.rowid.replace(rowid); + self.record.replace(next); + Ok(CursorResult::Ok(())) + } + CursorResult::IO => Ok(CursorResult::IO), + } + } + fn is_empty(&self) -> bool { self.record.borrow().is_none() } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 8d525d719..823510dd5 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -148,7 +148,7 @@ pub fn translate_insert( }, notnull_label, ); - program.emit_insn(Insn::NewRowid { reg: row_id_reg }); + program.emit_insn(Insn::NewRowid { cursor: cursor_id, rowid_reg: row_id_reg, prev_largest_reg: 0 }); program.resolve_label(notnull_label, program.offset()); program.emit_insn(Insn::MustBeInt { reg: row_id_reg }); diff --git a/core/types.rs b/core/types.rs index cc0165a61..d7d06c772 100644 --- a/core/types.rs +++ b/core/types.rs @@ -370,6 +370,7 @@ pub trait Cursor { fn wait_for_completion(&mut self) -> Result<()>; fn rowid(&self) -> Result>; fn seek_rowid(&mut self, rowid: u64) -> Result>; + fn seek_to_last(&mut self) -> Result>; fn record(&self) -> Result>>; fn insert( &mut self, diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 53020cd3c..ded6f0152 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -602,11 +602,11 @@ pub fn insn_to_str( 0, "".to_string(), ), - Insn::NewRowid { reg } => ( + Insn::NewRowid { cursor, rowid_reg, prev_largest_reg } => ( "NewRowId", - 0, - *reg as i32, - 0, + *cursor as i32, + *rowid_reg as i32, + *prev_largest_reg as i32, OwnedValue::Text(Rc::new("".to_string())), 0, "".to_string(), diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 67ce34f72..8c3a4c3f0 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -333,7 +333,9 @@ pub enum Insn { }, NewRowid { - reg: usize, + cursor: CursorID, // P1 + rowid_reg: usize, // P2 Destination register to store the new rowid + prev_largest_reg: usize // P3 Previous largest rowid in the table (Not used for now) }, MustBeInt { @@ -1412,7 +1414,16 @@ impl Program { cursor.wait_for_completion()?; state.pc += 1; } - Insn::NewRowid { reg: _ } => todo!(), + Insn::NewRowid { cursor, rowid_reg, .. } => { + let cursor = cursors.get_mut(cursor).unwrap(); + cursor.seek_to_last()?; + if let Some(rowid) = cursor.rowid()? { + state.registers[*rowid_reg] = OwnedValue::Integer((rowid+1) as i64); + } else { + state.registers[*rowid_reg] = OwnedValue::Integer(1); + } + state.pc += 1; + }, Insn::MustBeInt { reg } => { match state.registers[*reg] { OwnedValue::Integer(_) => {} diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index e489a99a4..b80ab6074 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -79,6 +79,10 @@ impl Cursor for Sorter { unimplemented!(); } + fn seek_to_last(&mut self) -> Result> { + unimplemented!(); + } + fn record(&self) -> Result>> { Ok(self.current.borrow()) } From 23a7d389b1bc919e4704ad4d316f1bd0391152b3 Mon Sep 17 00:00:00 2001 From: gandeevanr Date: Thu, 8 Aug 2024 19:04:10 -0700 Subject: [PATCH 2/2] added unit tests for NewRowid --- core/Cargo.toml | 2 + core/storage/btree.rs | 6 +- core/translate/insert.rs | 6 +- core/types.rs | 1 + core/vdbe/explain.rs | 6 +- core/vdbe/mod.rs | 196 ++++++++++++++++++++++++++++++++++++--- 6 files changed, 200 insertions(+), 17 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 331fe824f..2fd236807 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,6 +45,8 @@ indexmap = { version="2.2.6", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } pest = "2.0" pest_derive = "2.0" +mockall = "0.13.0" +rand = "0.8.5" [target.'cfg(not(target_family = "windows"))'.dev-dependencies] pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0bf6f4559..c243c47f3 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -211,7 +211,7 @@ impl BTreeCursor { let page = page.as_ref().unwrap(); if page.is_leaf() { if page.cell_count() > 0 { - mem_page.cell_idx.replace(page.cell_count()-1); + mem_page.cell_idx.replace(page.cell_count() - 1); } return Ok(CursorResult::Ok(())); } @@ -223,7 +223,7 @@ impl BTreeCursor { MemPage::new(Some(mem_page.clone()), right_most_pointer as usize, 0); self.page.replace(Some(Rc::new(mem_page))); continue; - }, + } None => { unreachable!("interior page should have a rightmost pointer"); @@ -883,7 +883,7 @@ impl Cursor for BTreeCursor { match self.is_empty_table()? { CursorResult::Ok(is_empty) => { assert!(is_empty) - }, + } CursorResult::IO => (), } } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 823510dd5..e3b2ae7e9 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -148,7 +148,11 @@ pub fn translate_insert( }, notnull_label, ); - program.emit_insn(Insn::NewRowid { cursor: cursor_id, rowid_reg: row_id_reg, prev_largest_reg: 0 }); + program.emit_insn(Insn::NewRowid { + cursor: cursor_id, + rowid_reg: row_id_reg, + prev_largest_reg: 0, + }); program.resolve_label(notnull_label, program.offset()); program.emit_insn(Insn::MustBeInt { reg: row_id_reg }); diff --git a/core/types.rs b/core/types.rs index d7d06c772..84eeaca8e 100644 --- a/core/types.rs +++ b/core/types.rs @@ -358,6 +358,7 @@ impl OwnedRecord { } } +#[derive(PartialEq, Debug)] pub enum CursorResult { Ok(T), IO, diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index ded6f0152..4209eb5cf 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -602,7 +602,11 @@ pub fn insn_to_str( 0, "".to_string(), ), - Insn::NewRowid { cursor, rowid_reg, prev_largest_reg } => ( + Insn::NewRowid { + cursor, + rowid_reg, + prev_largest_reg, + } => ( "NewRowId", *cursor as i32, *rowid_reg as i32, diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 8c3a4c3f0..bc20ad281 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -35,6 +35,8 @@ use crate::Result; use datetime::{exec_date, exec_time}; +use rand::distributions::{Distribution, Uniform}; +use rand::{thread_rng, Rng}; use regex::Regex; use std::borrow::BorrowMut; use std::cell::RefCell; @@ -333,9 +335,9 @@ pub enum Insn { }, NewRowid { - cursor: CursorID, // P1 - rowid_reg: usize, // P2 Destination register to store the new rowid - prev_largest_reg: usize // P3 Previous largest rowid in the table (Not used for now) + cursor: CursorID, // P1 + rowid_reg: usize, // P2 Destination register to store the new rowid + prev_largest_reg: usize, // P3 Previous largest rowid in the table (Not used for now) }, MustBeInt { @@ -1414,16 +1416,19 @@ impl Program { cursor.wait_for_completion()?; state.pc += 1; } - Insn::NewRowid { cursor, rowid_reg, .. } => { + Insn::NewRowid { + cursor, rowid_reg, .. + } => { let cursor = cursors.get_mut(cursor).unwrap(); - cursor.seek_to_last()?; - if let Some(rowid) = cursor.rowid()? { - state.registers[*rowid_reg] = OwnedValue::Integer((rowid+1) as i64); - } else { - state.registers[*rowid_reg] = OwnedValue::Integer(1); + let rowid = get_new_rowid(cursor, thread_rng())?; + match rowid { + CursorResult::Ok(rowid) => { + state.registers[*rowid_reg] = OwnedValue::Integer(rowid); + } + CursorResult::IO => return Ok(StepResult::IO), } state.pc += 1; - }, + } Insn::MustBeInt { reg } => { match state.registers[*reg] { OwnedValue::Integer(_) => {} @@ -1484,6 +1489,32 @@ impl Program { } } +fn get_new_rowid(cursor: &mut Box, mut rng: R) -> Result> { + cursor.seek_to_last()?; + let mut rowid = cursor.rowid()?.unwrap_or(0) + 1; + if rowid > std::i64::MAX.try_into().unwrap() { + let distribution = Uniform::from(1..=std::i64::MAX); + let max_attempts = 100; + for count in 0..max_attempts { + rowid = distribution.sample(&mut rng).try_into().unwrap(); + match cursor.seek_rowid(rowid)? { + CursorResult::Ok(false) => break, // Found a non-existing rowid + CursorResult::Ok(true) => { + if count == max_attempts - 1 { + return Err(LimboError::InternalError( + "Failed to generate a new rowid".to_string(), + )); + } else { + continue; // Try next random rowid + } + } + CursorResult::IO => return Ok(CursorResult::IO), + } + } + } + Ok(CursorResult::Ok(rowid.try_into().unwrap())) +} + fn make_record<'a>(registers: &'a [OwnedValue], start_reg: &usize, count: &usize) -> Record<'a> { let mut values = Vec::with_capacity(*count); for r in registers.iter().skip(*start_reg).take(*count) { @@ -1719,9 +1750,150 @@ fn exec_if(reg: &OwnedValue, null_reg: &OwnedValue, not: bool) -> bool { mod tests { use super::{ exec_abs, exec_if, exec_length, exec_like, exec_lower, exec_ltrim, exec_minmax, - exec_random, exec_round, exec_rtrim, exec_trim, exec_unicode, exec_upper, OwnedValue, + exec_random, exec_round, exec_rtrim, exec_trim, exec_unicode, exec_upper, get_new_rowid, + Cursor, CursorResult, LimboError, OwnedRecord, OwnedValue, Result, }; - use std::rc::Rc; + use mockall::{mock, predicate, predicate::*}; + use rand::{rngs::mock::StepRng, thread_rng}; + use std::{cell::Ref, rc::Rc}; + + mock! { + Cursor { + fn seek_to_last(&mut self) -> Result>; + fn rowid(&self) -> Result>; + fn seek_rowid(&mut self, rowid: u64) -> Result>; + } + } + + impl Cursor for MockCursor { + fn seek_to_last(&mut self) -> Result> { + return self.seek_to_last(); + } + + fn rowid(&self) -> Result> { + return self.rowid(); + } + + fn seek_rowid(&mut self, rowid: u64) -> Result> { + return self.seek_rowid(rowid); + } + + fn rewind(&mut self) -> Result> { + unimplemented!() + } + + fn next(&mut self) -> Result> { + unimplemented!() + } + + fn record(&self) -> Result>> { + unimplemented!() + } + + fn is_empty(&self) -> bool { + unimplemented!() + } + + fn set_null_flag(&mut self, _flag: bool) { + unimplemented!() + } + + fn get_null_flag(&self) -> bool { + unimplemented!() + } + + fn insert( + &mut self, + _key: &OwnedValue, + _record: &OwnedRecord, + _is_leaf: bool, + ) -> Result> { + unimplemented!() + } + + fn wait_for_completion(&mut self) -> Result<()> { + unimplemented!() + } + + fn exists(&mut self, _key: &OwnedValue) -> Result> { + unimplemented!() + } + } + + #[test] + fn test_get_new_rowid() -> Result<()> { + // Test case 0: Empty table + let mut mock = MockCursor::new(); + mock.expect_seek_to_last() + .return_once(|| Ok(CursorResult::Ok(()))); + mock.expect_rowid().return_once(|| Ok(None)); + + let result = get_new_rowid(&mut (Box::new(mock) as Box), thread_rng())?; + assert_eq!( + result, + CursorResult::Ok(1), + "For an empty table, rowid should be 1" + ); + + // Test case 1: Normal case, rowid within i64::MAX + let mut mock = MockCursor::new(); + mock.expect_seek_to_last() + .return_once(|| Ok(CursorResult::Ok(()))); + mock.expect_rowid().return_once(|| Ok(Some(100))); + + let result = get_new_rowid(&mut (Box::new(mock) as Box), thread_rng())?; + assert_eq!(result, CursorResult::Ok(101)); + + // Test case 2: Rowid exceeds i64::MAX, need to generate random rowid + let mut mock = MockCursor::new(); + mock.expect_seek_to_last() + .return_once(|| Ok(CursorResult::Ok(()))); + mock.expect_rowid() + .return_once(|| Ok(Some(std::i64::MAX as u64))); + mock.expect_seek_rowid() + .with(predicate::always()) + .returning(|rowid| { + if rowid == 50 { + Ok(CursorResult::Ok(false)) + } else { + Ok(CursorResult::Ok(true)) + } + }); + + // Mock the random number generation + let new_rowid = + get_new_rowid(&mut (Box::new(mock) as Box), StepRng::new(1, 1))?; + assert_eq!(new_rowid, CursorResult::Ok(50)); + + // Test case 3: IO error + let mut mock = MockCursor::new(); + mock.expect_seek_to_last() + .return_once(|| Ok(CursorResult::Ok(()))); + mock.expect_rowid() + .return_once(|| Ok(Some(std::i64::MAX as u64))); + mock.expect_seek_rowid() + .with(predicate::always()) + .return_once(|_| Ok(CursorResult::IO)); + + let result = get_new_rowid(&mut (Box::new(mock) as Box), thread_rng()); + assert!(matches!(result, Ok(CursorResult::IO))); + + // Test case 4: Failure to generate new rowid + let mut mock = MockCursor::new(); + mock.expect_seek_to_last() + .return_once(|| Ok(CursorResult::Ok(()))); + mock.expect_rowid() + .return_once(|| Ok(Some(std::i64::MAX as u64))); + mock.expect_seek_rowid() + .with(predicate::always()) + .returning(|_| Ok(CursorResult::Ok(true))); + + // Mock the random number generation + let result = get_new_rowid(&mut (Box::new(mock) as Box), StepRng::new(1, 1)); + assert!(matches!(result, Err(LimboError::InternalError(_)))); + + Ok(()) + } #[test] fn test_length() {