diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 558a6eee3..0f094ac5b 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -12,12 +12,14 @@ use crate::types::{ use crate::{return_corrupt, LimboError, Result}; use std::cell::{Cell, Ref, RefCell}; +use std::cmp::Ordering; use std::pin::Pin; use std::rc::Rc; use super::pager::PageRef; use super::sqlite3_ondisk::{ - write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE, + read_record, write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, + DATABASE_HEADER_SIZE, }; /* @@ -599,8 +601,8 @@ impl BTreeCursor { BTreeCell::TableLeafCell(TableLeafCell { _rowid, _payload, - first_overflow_page, payload_size, + first_overflow_page, }) => { assert!(predicate.is_none()); if let Some(next_page) = first_overflow_page { @@ -814,10 +816,8 @@ impl BTreeCursor { }; let record = self.get_immutable_record(); let record = record.as_ref().unwrap(); - let order = compare_immutable( - &record.get_values().as_slice()[..record.len() - 1], - &index_key.get_values().as_slice()[..], - ); + let without_rowid = &record.get_values().as_slice()[..record.len() - 1]; + let order = without_rowid.cmp(index_key.get_values()); let found = match op { SeekOp::GT => order.is_gt(), SeekOp::GE => order.is_ge(), @@ -1047,6 +1047,65 @@ impl BTreeCursor { } } + pub fn insert_index_key(&mut self, key: &ImmutableRecord) -> Result> { + if let CursorState::None = &self.state { + self.state = CursorState::Write(WriteInfo::new()); + } + + let ret = loop { + let write_state = self.state.mut_write_info().unwrap().state; + match write_state { + WriteState::Start => { + let page = self.stack.top(); + return_if_locked!(page); + page.set_dirty(); + self.pager.add_dirty(page.get().id); + let page = page.get().contents.as_mut().unwrap(); + + assert!(matches!(page.page_type(), PageType::IndexLeaf)); + let cell_idx = self.find_index_cell(page, key); + let mut cell_payload: Vec = Vec::new(); + fill_cell_payload( + page.page_type(), + None, + &mut cell_payload, + key, + self.usable_space() as u16, + self.pager.clone(), + ); + // insert + let overflow = { + debug!( + "insert_index_key(overflow, cell_count={})", + page.cell_count() + ); + insert_into_cell( + page, + cell_payload.as_slice(), + cell_idx, + self.usable_space() as u16, + )?; + page.overflow_cells.len() + }; + let write_info = self.state.mut_write_info().unwrap(); + write_info.state = if overflow > 0 { + WriteState::BalanceStart + } else { + WriteState::Finish + }; + } + WriteState::BalanceStart + | WriteState::BalanceNonRoot + | WriteState::BalanceNonRootWaitLoadPages => { + return_if_io!(self.balance()); + } + WriteState::Finish => break Ok(CursorResult::Ok(())), + } + }; + self.state = CursorState::None; + ret + } + /// Insert a record into the btree. /// If the insert operation overflows the page, it will be split and the btree will be balanced. fn insert_into_page( @@ -1943,6 +2002,74 @@ impl BTreeCursor { cell_idx } + fn find_index_cell(&self, page: &PageContent, key: &ImmutableRecord) -> usize { + let mut cell_idx = 0; + let cell_count = page.cell_count(); + while cell_idx < cell_count { + match page + .cell_get( + cell_idx, + payload_overflow_threshold_max(page.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(page.page_type(), self.usable_space() as u16), + self.usable_space(), + ) + .unwrap() + { + BTreeCell::IndexInteriorCell(IndexInteriorCell { payload, .. }) + | BTreeCell::IndexLeafCell(IndexLeafCell { payload, .. }) => { + read_record( + payload, + self.get_immutable_record_or_create().as_mut().unwrap(), + ) + .expect("failed to read record"); + let order = compare_immutable( + key.get_values(), + self.get_immutable_record().as_ref().unwrap().get_values(), + ); + match order { + Ordering::Less => { + break; + } + Ordering::Equal => { + break; + } + Ordering::Greater => {} + } + } + _ => unreachable!("Expected Index cell types"), + } + cell_idx += 1; + } + cell_idx + } + + pub fn seek_end(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); + self.move_to_root(); + loop { + let mem_page = self.stack.top(); + let page_id = mem_page.get().id; + let page = self.pager.read_page(page_id)?; + return_if_locked!(page); + + let contents = page.get().contents.as_ref().unwrap(); + if contents.is_leaf() { + // set cursor just past the last cell to append + self.stack.set_cell_index(contents.cell_count() as i32); + return Ok(CursorResult::Ok(())); + } + + match contents.rightmost_pointer() { + Some(right_most_pointer) => { + self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior + let child = self.pager.read_page(right_most_pointer as usize)?; + self.stack.push(child); + } + None => unreachable!("interior page must have rightmost pointer"), + } + } + } + pub fn seek_to_last(&mut self) -> Result> { return_if_io!(self.move_to_rightmost()); let rowid = return_if_io!(self.get_next_record(None)); @@ -2372,6 +2499,27 @@ impl BTreeCursor { self.null_flag } + /// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid + pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result> { + return_if_io!(self.do_seek(SeekKey::IndexKey(key), SeekOp::GE)); + if let Some(record) = self.record().as_ref() { + // get existing record, excluding the rowid + assert!(record.len() > 0); + let existing_key = &record.get_values()[..record.count() - 1]; + let inserted_key_vals = &key.get_values(); + if existing_key + .iter() + .zip(inserted_key_vals.iter()) + .all(|(a, b)| a == b) + { + return Ok(CursorResult::Ok(true)); // duplicate + } + } else { + return Err(LimboError::InvalidArgument("Expected Record key".into())); + } + Ok(CursorResult::Ok(false)) // no matching key found + } + pub fn exists(&mut self, key: &OwnedValue) -> Result> { assert!(self.mv_cursor.is_none()); let int_key = match key { diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 5fda098e6..d6faeb2c8 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -153,6 +153,7 @@ pub fn translate_insert( program.emit_insn(Insn::OpenWriteAsync { cursor_id, root_page, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); @@ -169,6 +170,7 @@ pub fn translate_insert( program.emit_insn(Insn::OpenWriteAsync { cursor_id, root_page, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index 7b51a2328..d9b7d6a36 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -103,6 +103,7 @@ pub fn init_loop( program.emit_insn(Insn::OpenWriteAsync { cursor_id, root_page, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); } @@ -111,6 +112,7 @@ pub fn init_loop( program.emit_insn(Insn::OpenWriteAsync { cursor_id, root_page, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); } @@ -145,6 +147,7 @@ pub fn init_loop( program.emit_insn(Insn::OpenWriteAsync { cursor_id: table_cursor_id, root_page: table.table.get_root_page(), + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); } @@ -152,6 +155,7 @@ pub fn init_loop( program.emit_insn(Insn::OpenWriteAsync { cursor_id: table_cursor_id, root_page: table.table.get_root_page(), + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); } @@ -178,6 +182,7 @@ pub fn init_loop( program.emit_insn(Insn::OpenWriteAsync { cursor_id: index_cursor_id, root_page: index.root_page, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); } @@ -185,6 +190,7 @@ pub fn init_loop( program.emit_insn(Insn::OpenWriteAsync { cursor_id: index_cursor_id, root_page: index.root_page, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); } diff --git a/core/translate/mod.rs b/core/translate/mod.rs index 739ae5f03..cf93b34ba 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -12,6 +12,7 @@ pub(crate) mod delete; pub(crate) mod emitter; pub(crate) mod expr; pub(crate) mod group_by; +pub(crate) mod index; pub(crate) mod insert; pub(crate) mod main_loop; pub(crate) mod optimizer; @@ -34,6 +35,7 @@ use crate::translate::delete::translate_delete; use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode}; use crate::vdbe::Program; use crate::{bail_parse_error, Connection, Result, SymbolTable}; +use index::translate_create_index; use insert::translate_insert; use limbo_sqlite3_parser::ast::{self, Delete, Insert}; use schema::{translate_create_table, translate_create_virtual_table, translate_drop_table}; @@ -61,7 +63,24 @@ pub fn translate( ast::Stmt::Attach { .. } => bail_parse_error!("ATTACH not supported yet"), ast::Stmt::Begin(tx_type, tx_name) => translate_tx_begin(tx_type, tx_name)?, ast::Stmt::Commit(tx_name) => translate_tx_commit(tx_name)?, - ast::Stmt::CreateIndex { .. } => bail_parse_error!("CREATE INDEX not supported yet"), + ast::Stmt::CreateIndex { + unique, + if_not_exists, + idx_name, + tbl_name, + columns, + .. + } => { + change_cnt_on = true; + translate_create_index( + query_mode, + (unique, if_not_exists), + &idx_name.name.0, + &tbl_name.0, + &columns, + schema, + )? + } ast::Stmt::CreateTable { temporary, if_not_exists, diff --git a/core/translate/schema.rs b/core/translate/schema.rs index 29cf29644..6f87937c9 100644 --- a/core/translate/schema.rs +++ b/core/translate/schema.rs @@ -104,6 +104,7 @@ pub fn translate_create_table( program.emit_insn(Insn::OpenWriteAsync { cursor_id: sqlite_schema_cursor_id, root_page: 1, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); @@ -155,8 +156,8 @@ pub fn translate_create_table( Ok(program) } -#[derive(Debug)] -enum SchemaEntryType { +#[derive(Debug, Clone, Copy)] +pub enum SchemaEntryType { Table, Index, } @@ -169,9 +170,9 @@ impl SchemaEntryType { } } } -const SQLITE_TABLEID: &str = "sqlite_schema"; +pub const SQLITE_TABLEID: &str = "sqlite_schema"; -fn emit_schema_entry( +pub fn emit_schema_entry( program: &mut ProgramBuilder, sqlite_schema_cursor_id: usize, entry_type: SchemaEntryType, @@ -501,6 +502,7 @@ pub fn translate_create_virtual_table( program.emit_insn(Insn::OpenWriteAsync { cursor_id: sqlite_schema_cursor_id, root_page: 1, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); @@ -572,7 +574,7 @@ pub fn translate_drop_table( let row_id_reg = program.alloc_register(); // r5 let table_name = "sqlite_schema"; - let schema_table = schema.get_btree_table(&table_name).unwrap(); + let schema_table = schema.get_btree_table(table_name).unwrap(); let sqlite_schema_cursor_id = program.alloc_cursor_id( Some(table_name.to_string()), CursorType::BTreeTable(schema_table.clone()), @@ -580,6 +582,7 @@ pub fn translate_drop_table( program.emit_insn(Insn::OpenWriteAsync { cursor_id: sqlite_schema_cursor_id, root_page: 1, + is_new_idx: false, }); program.emit_insn(Insn::OpenWriteAwait {}); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 3c511a0db..89bd26859 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -21,7 +21,7 @@ use crate::util::{ checked_cast_text_to_numeric, parse_schema_rows, RoundToPrecision, }; use crate::vdbe::builder::CursorType; -use crate::vdbe::insn::Insn; +use crate::vdbe::insn::{IdxInsertFlags, Insn}; use crate::vector::{vector32, vector64, vector_distance_cos, vector_extract}; use crate::{info, MvCursor, RefValue, Row, StepResult, TransactionState}; @@ -2049,6 +2049,24 @@ pub fn op_idx_ge( Ok(InsnFunctionStepResult::Step) } +pub fn op_seek_end( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + if let Insn::SeekEnd { cursor_id } = *insn { + let mut cursor = state.get_cursor(cursor_id); + let cursor = cursor.as_btree_mut(); + return_if_io!(cursor.seek_end()); + } else { + unreachable!("unexpected Insn {:?}", insn) + } + state.pc += 1; + Ok(InsnFunctionStepResult::Step) +} + pub fn op_idx_le( program: &Program, state: &mut ProgramState, @@ -3706,6 +3724,73 @@ pub fn op_delete_async( Ok(InsnFunctionStepResult::Step) } +pub fn op_idx_insert_async( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + if let Insn::IdxInsertAsync { + cursor_id, + record_reg, + flags, + .. + } = *insn + { + let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); + let CursorType::BTreeIndex(index_meta) = cursor_type else { + panic!("IdxInsert: not a BTree index cursor"); + }; + { + let mut cursor = state.get_cursor(cursor_id); + let cursor = cursor.as_btree_mut(); + let record = match &state.registers[record_reg] { + Register::Record(ref r) => r, + _ => return Err(LimboError::InternalError("expected record".into())), + }; + let moved_before = if index_meta.unique { + // check for uniqueness violation + match cursor.key_exists_in_index(record)? { + CursorResult::Ok(true) => { + return Err(LimboError::Constraint( + "UNIQUE constraint failed: duplicate key".into(), + )) + } + CursorResult::IO => return Ok(InsnFunctionStepResult::IO), + CursorResult::Ok(false) => {} + }; + false + } else { + flags.has(IdxInsertFlags::USE_SEEK) + }; + // insert record as key + return_if_io!(cursor.insert_index_key(record)); + } + state.pc += 1; + } + Ok(InsnFunctionStepResult::Step) +} + +pub fn op_idx_insert_await( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + if let Insn::IdxInsertAwait { cursor_id } = *insn { + { + let mut cursor = state.get_cursor(cursor_id); + let cursor = cursor.as_btree_mut(); + cursor.wait_for_completion()?; + } + // TODO: flag optimizations, update n_change if OPFLAG_NCHANGE + state.pc += 1; + } + Ok(InsnFunctionStepResult::Step) +} + pub fn op_delete_await( program: &Program, state: &mut ProgramState, @@ -3889,6 +3974,7 @@ pub fn op_open_write_async( let Insn::OpenWriteAsync { cursor_id, root_page, + .. } = insn else { unreachable!("unexpected Insn {:?}", insn) diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 67333c334..f80a442f1 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -760,6 +760,39 @@ pub fn insn_to_str( 0, "".to_string(), ), + Insn::SeekEnd { cursor_id } => ( + "SeekEnd", + *cursor_id as i32, + 0, + 0, + OwnedValue::build_text(""), + 0, + "".to_string(), + ), + Insn::IdxInsertAsync { + cursor_id, + record_reg, + unpacked_start, + flags, + .. + } => ( + "IdxInsertAsync", + *cursor_id as i32, + *record_reg as i32, + unpacked_start.unwrap_or(0) as i32, + OwnedValue::build_text(""), + flags.0 as u16, + format!("key=r[{}]", record_reg), + ), + Insn::IdxInsertAwait { cursor_id } => ( + "IdxInsertAwait", + *cursor_id as i32, + 0, + 0, + OwnedValue::build_text(""), + 0, + "".to_string(), + ), Insn::IdxGT { cursor_id, start_reg, @@ -1097,6 +1130,7 @@ pub fn insn_to_str( Insn::OpenWriteAsync { cursor_id, root_page, + .. } => ( "OpenWriteAsync", *cursor_id as i32, diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index f45e7ce35..5ce68f14a 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -38,6 +38,44 @@ impl CmpInsFlags { } } +#[derive(Clone, Copy, Debug, Default)] +pub struct IdxInsertFlags(pub u8); +impl IdxInsertFlags { + pub const APPEND: u8 = 0x01; // Hint: insert likely at the end + pub const NCHANGE: u8 = 0x02; // Increment the change counter + pub const USE_SEEK: u8 = 0x04; // Skip seek if last one was same key + pub fn new() -> Self { + IdxInsertFlags(0) + } + pub fn has(&self, flag: u8) -> bool { + (self.0 & flag) != 0 + } + pub fn append(mut self, append: bool) -> Self { + if append { + self.0 |= IdxInsertFlags::APPEND; + } else { + self.0 &= !IdxInsertFlags::APPEND; + } + self + } + pub fn use_seek(mut self, seek: bool) -> Self { + if seek { + self.0 |= IdxInsertFlags::USE_SEEK; + } else { + self.0 &= !IdxInsertFlags::USE_SEEK; + } + self + } + pub fn nchange(mut self, change: bool) -> Self { + if change { + self.0 |= IdxInsertFlags::NCHANGE; + } else { + self.0 &= !IdxInsertFlags::NCHANGE; + } + self + } +} + #[derive(Description, Debug)] pub enum Insn { /// Initialize the program state and jump to the given PC. @@ -401,6 +439,9 @@ pub enum Insn { src_reg: usize, target_pc: BranchOffset, }, + SeekEnd { + cursor_id: CursorID, + }, /// P1 is an open index cursor and P3 is a cursor on the corresponding table. This opcode does a deferred seek of the P3 table cursor to the row that corresponds to the current row of P1. /// This is a deferred seek. Nothing actually happens until the cursor is used to read a record. That way, if no reads occur, no unnecessary I/O happens. @@ -431,8 +472,20 @@ pub enum Insn { target_pc: BranchOffset, }, - /// The P4 register values beginning with P3 form an unpacked index key that omits the PRIMARY KEY. Compare this key value against the index that P1 is currently pointing to, ignoring the PRIMARY KEY or ROWID fields at the end. - /// If the P1 index entry is greater or equal than the key value then jump to P2. Otherwise fall through to the next instruction. + // cursor_id is a cursor pointing to a B-Tree index that uses integer keys, this op writes the value obtained from MakeRecord into the index. + // P3 + P4 are for the original column values that make up that key in unpacked (pre-serialized) form. + // If P5 has the OPFLAG_APPEND bit set, that is a hint to the b-tree layer that this insert is likely to be an append. + // OPFLAG_NCHANGE bit set, then the change counter is incremented by this instruction. If the OPFLAG_NCHANGE bit is clear, then the change counter is unchanged + IdxInsertAsync { + cursor_id: CursorID, + record_reg: usize, // P2 the register containing the record to insert + unpacked_start: Option, // P3 the index of the first register for the unpacked key + unpacked_count: Option, // P4 # of unpacked values in the key in P2 + flags: IdxInsertFlags, // TODO: optimization + }, + IdxInsertAwait { + cursor_id: CursorID, + }, IdxGE { cursor_id: CursorID, start_reg: usize, @@ -588,6 +641,7 @@ pub enum Insn { OpenWriteAsync { cursor_id: CursorID, root_page: PageIdx, + is_new_idx: bool, }, OpenWriteAwait {}, @@ -1237,10 +1291,13 @@ impl Insn { Insn::DeferredSeek { .. } => execute::op_deferred_seek, Insn::SeekGE { .. } => execute::op_seek_ge, Insn::SeekGT { .. } => execute::op_seek_gt, + Insn::SeekEnd { .. } => execute::op_seek_end, Insn::IdxGE { .. } => execute::op_idx_ge, Insn::IdxGT { .. } => execute::op_idx_gt, Insn::IdxLE { .. } => execute::op_idx_le, Insn::IdxLT { .. } => execute::op_idx_lt, + Insn::IdxInsertAsync { .. } => execute::op_idx_insert_async, + Insn::IdxInsertAwait { .. } => execute::op_idx_insert_await, Insn::DecrJumpZero { .. } => execute::op_decr_jump_zero, Insn::AggStep { .. } => execute::op_agg_step,