mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-10 11:44:22 +01:00
Merge 'Implement NewRowid' from GV
Fixes https://github.com/penberg/limbo/issues/275 Closes #278
This commit is contained in:
@@ -45,6 +45,8 @@ indexmap = { version="2.2.6", features = ["serde"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
pest = "2.0"
|
||||
pest_derive = "2.0"
|
||||
mockall = "0.13.0"
|
||||
rand = "0.8.5"
|
||||
|
||||
[target.'cfg(not(target_family = "windows"))'.dev-dependencies]
|
||||
pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] }
|
||||
|
||||
@@ -50,6 +50,10 @@ impl Cursor for PseudoCursor {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn record(&self) -> Result<Ref<Option<OwnedRecord>>> {
|
||||
Ok(self.current.borrow())
|
||||
}
|
||||
|
||||
@@ -72,6 +72,18 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_empty_table(&mut self) -> Result<CursorResult<bool>> {
|
||||
let page = self.pager.read_page(self.root_page)?;
|
||||
let page = RefCell::borrow(&page);
|
||||
if page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
|
||||
let page = page.contents.read().unwrap();
|
||||
let page = page.as_ref().unwrap();
|
||||
Ok(CursorResult::Ok(page.cell_count() == 0))
|
||||
}
|
||||
|
||||
fn get_next_record(&mut self) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
|
||||
loop {
|
||||
let mem_page = {
|
||||
@@ -184,6 +196,42 @@ impl BTreeCursor {
|
||||
.replace(Some(Rc::new(MemPage::new(None, self.root_page, 0))));
|
||||
}
|
||||
|
||||
fn move_to_rightmost(&mut self) -> Result<CursorResult<()>> {
|
||||
self.move_to_root();
|
||||
|
||||
loop {
|
||||
let mem_page = self.page.borrow().as_ref().unwrap().clone();
|
||||
let page_idx = mem_page.page_idx;
|
||||
let page = self.pager.read_page(page_idx)?;
|
||||
let page = RefCell::borrow(&page);
|
||||
if page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
let page = page.contents.read().unwrap();
|
||||
let page = page.as_ref().unwrap();
|
||||
if page.is_leaf() {
|
||||
if page.cell_count() > 0 {
|
||||
mem_page.cell_idx.replace(page.cell_count() - 1);
|
||||
}
|
||||
return Ok(CursorResult::Ok(()));
|
||||
}
|
||||
|
||||
match page.rightmost_pointer() {
|
||||
Some(right_most_pointer) => {
|
||||
mem_page.cell_idx.replace(page.cell_count());
|
||||
let mem_page =
|
||||
MemPage::new(Some(mem_page.clone()), right_most_pointer as usize, 0);
|
||||
self.page.replace(Some(Rc::new(mem_page)));
|
||||
continue;
|
||||
}
|
||||
|
||||
None => {
|
||||
unreachable!("interior page should have a rightmost pointer");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn move_to(&mut self, key: u64) -> Result<CursorResult<()>> {
|
||||
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
|
||||
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
|
||||
@@ -827,6 +875,26 @@ fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount
|
||||
}
|
||||
|
||||
impl Cursor for BTreeCursor {
|
||||
fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
|
||||
self.move_to_rightmost()?;
|
||||
match self.get_next_record()? {
|
||||
CursorResult::Ok((rowid, next)) => {
|
||||
if rowid.is_none() {
|
||||
match self.is_empty_table()? {
|
||||
CursorResult::Ok(is_empty) => {
|
||||
assert!(is_empty)
|
||||
}
|
||||
CursorResult::IO => (),
|
||||
}
|
||||
}
|
||||
self.rowid.replace(rowid);
|
||||
self.record.replace(next);
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
CursorResult::IO => Ok(CursorResult::IO),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.record.borrow().is_none()
|
||||
}
|
||||
|
||||
@@ -148,7 +148,11 @@ pub fn translate_insert(
|
||||
},
|
||||
notnull_label,
|
||||
);
|
||||
program.emit_insn(Insn::NewRowid { reg: row_id_reg });
|
||||
program.emit_insn(Insn::NewRowid {
|
||||
cursor: cursor_id,
|
||||
rowid_reg: row_id_reg,
|
||||
prev_largest_reg: 0,
|
||||
});
|
||||
|
||||
program.resolve_label(notnull_label, program.offset());
|
||||
program.emit_insn(Insn::MustBeInt { reg: row_id_reg });
|
||||
|
||||
@@ -358,6 +358,7 @@ impl OwnedRecord {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub enum CursorResult<T> {
|
||||
Ok(T),
|
||||
IO,
|
||||
@@ -370,6 +371,7 @@ pub trait Cursor {
|
||||
fn wait_for_completion(&mut self) -> Result<()>;
|
||||
fn rowid(&self) -> Result<Option<u64>>;
|
||||
fn seek_rowid(&mut self, rowid: u64) -> Result<CursorResult<bool>>;
|
||||
fn seek_to_last(&mut self) -> Result<CursorResult<()>>;
|
||||
fn record(&self) -> Result<Ref<Option<OwnedRecord>>>;
|
||||
fn insert(
|
||||
&mut self,
|
||||
|
||||
@@ -602,11 +602,15 @@ pub fn insn_to_str(
|
||||
0,
|
||||
"".to_string(),
|
||||
),
|
||||
Insn::NewRowid { reg } => (
|
||||
Insn::NewRowid {
|
||||
cursor,
|
||||
rowid_reg,
|
||||
prev_largest_reg,
|
||||
} => (
|
||||
"NewRowId",
|
||||
0,
|
||||
*reg as i32,
|
||||
0,
|
||||
*cursor as i32,
|
||||
*rowid_reg as i32,
|
||||
*prev_largest_reg as i32,
|
||||
OwnedValue::Text(Rc::new("".to_string())),
|
||||
0,
|
||||
"".to_string(),
|
||||
|
||||
191
core/vdbe/mod.rs
191
core/vdbe/mod.rs
@@ -35,6 +35,8 @@ use crate::Result;
|
||||
|
||||
use datetime::{exec_date, exec_time};
|
||||
|
||||
use rand::distributions::{Distribution, Uniform};
|
||||
use rand::{thread_rng, Rng};
|
||||
use regex::Regex;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::cell::RefCell;
|
||||
@@ -333,7 +335,9 @@ pub enum Insn {
|
||||
},
|
||||
|
||||
NewRowid {
|
||||
reg: usize,
|
||||
cursor: CursorID, // P1
|
||||
rowid_reg: usize, // P2 Destination register to store the new rowid
|
||||
prev_largest_reg: usize, // P3 Previous largest rowid in the table (Not used for now)
|
||||
},
|
||||
|
||||
MustBeInt {
|
||||
@@ -1412,7 +1416,19 @@ impl Program {
|
||||
cursor.wait_for_completion()?;
|
||||
state.pc += 1;
|
||||
}
|
||||
Insn::NewRowid { reg: _ } => todo!(),
|
||||
Insn::NewRowid {
|
||||
cursor, rowid_reg, ..
|
||||
} => {
|
||||
let cursor = cursors.get_mut(cursor).unwrap();
|
||||
let rowid = get_new_rowid(cursor, thread_rng())?;
|
||||
match rowid {
|
||||
CursorResult::Ok(rowid) => {
|
||||
state.registers[*rowid_reg] = OwnedValue::Integer(rowid);
|
||||
}
|
||||
CursorResult::IO => return Ok(StepResult::IO),
|
||||
}
|
||||
state.pc += 1;
|
||||
}
|
||||
Insn::MustBeInt { reg } => {
|
||||
match state.registers[*reg] {
|
||||
OwnedValue::Integer(_) => {}
|
||||
@@ -1473,6 +1489,32 @@ impl Program {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_new_rowid<R: Rng>(cursor: &mut Box<dyn Cursor>, mut rng: R) -> Result<CursorResult<i64>> {
|
||||
cursor.seek_to_last()?;
|
||||
let mut rowid = cursor.rowid()?.unwrap_or(0) + 1;
|
||||
if rowid > std::i64::MAX.try_into().unwrap() {
|
||||
let distribution = Uniform::from(1..=std::i64::MAX);
|
||||
let max_attempts = 100;
|
||||
for count in 0..max_attempts {
|
||||
rowid = distribution.sample(&mut rng).try_into().unwrap();
|
||||
match cursor.seek_rowid(rowid)? {
|
||||
CursorResult::Ok(false) => break, // Found a non-existing rowid
|
||||
CursorResult::Ok(true) => {
|
||||
if count == max_attempts - 1 {
|
||||
return Err(LimboError::InternalError(
|
||||
"Failed to generate a new rowid".to_string(),
|
||||
));
|
||||
} else {
|
||||
continue; // Try next random rowid
|
||||
}
|
||||
}
|
||||
CursorResult::IO => return Ok(CursorResult::IO),
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(CursorResult::Ok(rowid.try_into().unwrap()))
|
||||
}
|
||||
|
||||
fn make_record<'a>(registers: &'a [OwnedValue], start_reg: &usize, count: &usize) -> Record<'a> {
|
||||
let mut values = Vec::with_capacity(*count);
|
||||
for r in registers.iter().skip(*start_reg).take(*count) {
|
||||
@@ -1708,9 +1750,150 @@ fn exec_if(reg: &OwnedValue, null_reg: &OwnedValue, not: bool) -> bool {
|
||||
mod tests {
|
||||
use super::{
|
||||
exec_abs, exec_if, exec_length, exec_like, exec_lower, exec_ltrim, exec_minmax,
|
||||
exec_random, exec_round, exec_rtrim, exec_trim, exec_unicode, exec_upper, OwnedValue,
|
||||
exec_random, exec_round, exec_rtrim, exec_trim, exec_unicode, exec_upper, get_new_rowid,
|
||||
Cursor, CursorResult, LimboError, OwnedRecord, OwnedValue, Result,
|
||||
};
|
||||
use std::rc::Rc;
|
||||
use mockall::{mock, predicate, predicate::*};
|
||||
use rand::{rngs::mock::StepRng, thread_rng};
|
||||
use std::{cell::Ref, rc::Rc};
|
||||
|
||||
mock! {
|
||||
Cursor {
|
||||
fn seek_to_last(&mut self) -> Result<CursorResult<()>>;
|
||||
fn rowid(&self) -> Result<Option<u64>>;
|
||||
fn seek_rowid(&mut self, rowid: u64) -> Result<CursorResult<bool>>;
|
||||
}
|
||||
}
|
||||
|
||||
impl Cursor for MockCursor {
|
||||
fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
|
||||
return self.seek_to_last();
|
||||
}
|
||||
|
||||
fn rowid(&self) -> Result<Option<u64>> {
|
||||
return self.rowid();
|
||||
}
|
||||
|
||||
fn seek_rowid(&mut self, rowid: u64) -> Result<CursorResult<bool>> {
|
||||
return self.seek_rowid(rowid);
|
||||
}
|
||||
|
||||
fn rewind(&mut self) -> Result<CursorResult<()>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn next(&mut self) -> Result<CursorResult<()>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn record(&self) -> Result<Ref<Option<OwnedRecord>>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_null_flag(&mut self, _flag: bool) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_null_flag(&self) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn insert(
|
||||
&mut self,
|
||||
_key: &OwnedValue,
|
||||
_record: &OwnedRecord,
|
||||
_is_leaf: bool,
|
||||
) -> Result<CursorResult<()>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn wait_for_completion(&mut self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn exists(&mut self, _key: &OwnedValue) -> Result<CursorResult<bool>> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_new_rowid() -> Result<()> {
|
||||
// Test case 0: Empty table
|
||||
let mut mock = MockCursor::new();
|
||||
mock.expect_seek_to_last()
|
||||
.return_once(|| Ok(CursorResult::Ok(())));
|
||||
mock.expect_rowid().return_once(|| Ok(None));
|
||||
|
||||
let result = get_new_rowid(&mut (Box::new(mock) as Box<dyn Cursor>), thread_rng())?;
|
||||
assert_eq!(
|
||||
result,
|
||||
CursorResult::Ok(1),
|
||||
"For an empty table, rowid should be 1"
|
||||
);
|
||||
|
||||
// Test case 1: Normal case, rowid within i64::MAX
|
||||
let mut mock = MockCursor::new();
|
||||
mock.expect_seek_to_last()
|
||||
.return_once(|| Ok(CursorResult::Ok(())));
|
||||
mock.expect_rowid().return_once(|| Ok(Some(100)));
|
||||
|
||||
let result = get_new_rowid(&mut (Box::new(mock) as Box<dyn Cursor>), thread_rng())?;
|
||||
assert_eq!(result, CursorResult::Ok(101));
|
||||
|
||||
// Test case 2: Rowid exceeds i64::MAX, need to generate random rowid
|
||||
let mut mock = MockCursor::new();
|
||||
mock.expect_seek_to_last()
|
||||
.return_once(|| Ok(CursorResult::Ok(())));
|
||||
mock.expect_rowid()
|
||||
.return_once(|| Ok(Some(std::i64::MAX as u64)));
|
||||
mock.expect_seek_rowid()
|
||||
.with(predicate::always())
|
||||
.returning(|rowid| {
|
||||
if rowid == 50 {
|
||||
Ok(CursorResult::Ok(false))
|
||||
} else {
|
||||
Ok(CursorResult::Ok(true))
|
||||
}
|
||||
});
|
||||
|
||||
// Mock the random number generation
|
||||
let new_rowid =
|
||||
get_new_rowid(&mut (Box::new(mock) as Box<dyn Cursor>), StepRng::new(1, 1))?;
|
||||
assert_eq!(new_rowid, CursorResult::Ok(50));
|
||||
|
||||
// Test case 3: IO error
|
||||
let mut mock = MockCursor::new();
|
||||
mock.expect_seek_to_last()
|
||||
.return_once(|| Ok(CursorResult::Ok(())));
|
||||
mock.expect_rowid()
|
||||
.return_once(|| Ok(Some(std::i64::MAX as u64)));
|
||||
mock.expect_seek_rowid()
|
||||
.with(predicate::always())
|
||||
.return_once(|_| Ok(CursorResult::IO));
|
||||
|
||||
let result = get_new_rowid(&mut (Box::new(mock) as Box<dyn Cursor>), thread_rng());
|
||||
assert!(matches!(result, Ok(CursorResult::IO)));
|
||||
|
||||
// Test case 4: Failure to generate new rowid
|
||||
let mut mock = MockCursor::new();
|
||||
mock.expect_seek_to_last()
|
||||
.return_once(|| Ok(CursorResult::Ok(())));
|
||||
mock.expect_rowid()
|
||||
.return_once(|| Ok(Some(std::i64::MAX as u64)));
|
||||
mock.expect_seek_rowid()
|
||||
.with(predicate::always())
|
||||
.returning(|_| Ok(CursorResult::Ok(true)));
|
||||
|
||||
// Mock the random number generation
|
||||
let result = get_new_rowid(&mut (Box::new(mock) as Box<dyn Cursor>), StepRng::new(1, 1));
|
||||
assert!(matches!(result, Err(LimboError::InternalError(_))));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_length() {
|
||||
|
||||
@@ -79,6 +79,10 @@ impl Cursor for Sorter {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn seek_to_last(&mut self) -> Result<CursorResult<()>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn record(&self) -> Result<Ref<Option<OwnedRecord>>> {
|
||||
Ok(self.current.borrow())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user