diff --git a/core/lib.rs b/core/lib.rs index d9be4e0f6..9632e1829 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -85,6 +85,8 @@ enum TransactionState { pub(crate) type MvStore = crate::mvcc::MvStore; +pub(crate) type MvCursor = crate::mvcc::cursor::ScanCursor; + pub struct Database { mv_store: Option>, schema: Arc>, diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2fa4fb65d..69418e2a4 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4,6 +4,7 @@ use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::{ read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell, }; +use crate::MvCursor; use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp}; use crate::{return_corrupt, LimboError, Result}; @@ -135,6 +136,9 @@ impl CursorState { } pub struct BTreeCursor { + /// The multi-version cursor that is used to read and write to the database file. + mv_cursor: Option>>, + /// The pager that is used to read and write to the database file. pager: Rc, /// Page id of the root page used to go back up fast. root_page: usize, @@ -178,8 +182,13 @@ struct CellArray { } impl BTreeCursor { - pub fn new(pager: Rc, root_page: usize) -> Self { + pub fn new( + mv_cursor: Option>>, + pager: Rc, + root_page: usize, + ) -> Self { Self { + mv_cursor, pager, root_page, rowid: Cell::new(None), @@ -198,6 +207,10 @@ impl BTreeCursor { /// Check if the table is empty. /// This is done by checking if the root page has no cells. fn is_empty_table(&self) -> Result> { + if let Some(mv_cursor) = &self.mv_cursor { + let mv_cursor = mv_cursor.borrow(); + return Ok(CursorResult::Ok(mv_cursor.is_empty())); + } let page = self.pager.read_page(self.root_page)?; return_if_locked!(page); @@ -290,6 +303,19 @@ impl BTreeCursor { &mut self, predicate: Option<(SeekKey<'_>, SeekOp)>, ) -> Result, Option)>> { + if let Some(mv_cursor) = &self.mv_cursor { + let mut mv_cursor = mv_cursor.borrow_mut(); + let rowid = mv_cursor.current_row_id(); + match rowid { + Some(rowid) => { + let record = mv_cursor.current_row().unwrap().unwrap(); + let record: Record = crate::storage::sqlite3_ondisk::read_record(&record.data)?; + mv_cursor.forward(); + return Ok(CursorResult::Ok((Some(rowid.row_id), Some(record)))); + } + None => return Ok(CursorResult::Ok((None, None))), + } + } loop { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_cell_index() as usize; @@ -592,6 +618,7 @@ impl BTreeCursor { } pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result> { + assert!(self.mv_cursor.is_none()); // For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers. // B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data. // @@ -1592,15 +1619,22 @@ impl BTreeCursor { } pub fn rewind(&mut self) -> Result> { - self.move_to_root(); + if let Some(_) = &self.mv_cursor { + let (rowid, record) = return_if_io!(self.get_next_record(None)); + self.rowid.replace(rowid); + self.record.replace(record); + } else { + self.move_to_root(); - let (rowid, record) = return_if_io!(self.get_next_record(None)); - self.rowid.replace(rowid); - self.record.replace(record); + let (rowid, record) = return_if_io!(self.get_next_record(None)); + self.rowid.replace(rowid); + self.record.replace(record); + } Ok(CursorResult::Ok(())) } pub fn last(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); match self.move_to_rightmost()? { CursorResult::Ok(_) => self.prev(), CursorResult::IO => Ok(CursorResult::IO), @@ -1615,6 +1649,7 @@ impl BTreeCursor { } pub fn prev(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); match self.get_prev_record()? { CursorResult::Ok((rowid, record)) => { self.rowid.replace(rowid); @@ -1631,10 +1666,15 @@ impl BTreeCursor { } pub fn rowid(&self) -> Result> { + if let Some(mv_cursor) = &self.mv_cursor { + let mv_cursor = mv_cursor.borrow(); + return Ok(mv_cursor.current_row_id().map(|rowid| rowid.row_id)); + } Ok(self.rowid.get()) } pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { + assert!(self.mv_cursor.is_none()); let (rowid, record) = return_if_io!(self.do_seek(key, op)); self.rowid.replace(rowid); self.record.replace(record); @@ -1648,23 +1688,35 @@ impl BTreeCursor { pub fn insert( &mut self, key: &OwnedValue, - _record: &Record, + record: &Record, moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */ ) -> Result> { let int_key = match key { OwnedValue::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), }; - if !moved_before { - return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); - } - - return_if_io!(self.insert_into_page(key, _record)); - self.rowid.replace(Some(*int_key as u64)); + match &self.mv_cursor { + Some(mv_cursor) => { + let row_id = + crate::mvcc::database::RowID::new(self.table_id() as u64, *int_key as u64); + let mut record_buf = Vec::new(); + record.serialize(&mut record_buf); + let row = crate::mvcc::database::Row::new(row_id, record_buf); + mv_cursor.borrow_mut().insert(row).unwrap(); + } + None => { + if !moved_before { + return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); + } + return_if_io!(self.insert_into_page(key, record)); + self.rowid.replace(Some(*int_key as u64)); + } + }; Ok(CursorResult::Ok(())) } pub fn delete(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); let page = self.stack.top(); return_if_locked!(page); @@ -1808,6 +1860,7 @@ impl BTreeCursor { } pub fn exists(&mut self, key: &OwnedValue) -> Result> { + assert!(self.mv_cursor.is_none()); let int_key = match key { OwnedValue::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), @@ -1927,6 +1980,10 @@ impl BTreeCursor { Ok(Some(n_overflow)) } + + pub fn table_id(&self) -> usize { + self.root_page + } } impl PageStack { @@ -2905,7 +2962,7 @@ mod tests { } fn validate_btree(pager: Rc, page_idx: usize) -> (usize, bool) { - let cursor = BTreeCursor::new(pager.clone(), page_idx); + let cursor = BTreeCursor::new(None, pager.clone(), page_idx); let page = pager.read_page(page_idx).unwrap(); let page = page.get(); let contents = page.contents.as_ref().unwrap(); @@ -2969,7 +3026,7 @@ mod tests { } fn format_btree(pager: Rc, page_idx: usize, depth: usize) -> String { - let cursor = BTreeCursor::new(pager.clone(), page_idx); + let cursor = BTreeCursor::new(None, pager.clone(), page_idx); let page = pager.read_page(page_idx).unwrap(); let page = page.get(); let contents = page.contents.as_ref().unwrap(); @@ -3100,7 +3157,7 @@ mod tests { .as_slice(), ] { let (pager, root_page) = empty_btree(); - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); for (key, size) in sequence.iter() { run_until_done( || { @@ -3151,7 +3208,7 @@ mod tests { tracing::info!("super seed: {}", seed); for _ in 0..attempts { let (pager, root_page) = empty_btree(); - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); let mut keys = Vec::new(); let seed = rng.next_u64(); tracing::info!("seed: {}", seed); @@ -3332,7 +3389,7 @@ mod tests { #[ignore] pub fn test_clear_overflow_pages() -> Result<()> { let (pager, db_header) = setup_test_env(5); - let cursor = BTreeCursor::new(pager.clone(), 1); + let cursor = BTreeCursor::new(None, pager.clone(), 1); let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096); let usable_size = cursor.usable_space(); @@ -3430,7 +3487,7 @@ mod tests { #[test] pub fn test_clear_overflow_pages_no_overflow() -> Result<()> { let (pager, db_header) = setup_test_env(5); - let cursor = BTreeCursor::new(pager.clone(), 1); + let cursor = BTreeCursor::new(None, pager.clone(), 1); let small_payload = vec![b'A'; 10]; @@ -3869,7 +3926,7 @@ mod tests { let (pager, root_page) = empty_btree(); let mut keys = Vec::new(); for i in 0..10000 { - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); tracing::info!("INSERT INTO t VALUES ({});", i,); let key = OwnedValue::Integer(i); let value = Record::new(vec![OwnedValue::Integer(i)]); @@ -3893,7 +3950,7 @@ mod tests { format_btree(pager.clone(), root_page, 0) ); for key in keys.iter() { - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); let key = OwnedValue::Integer(*key); let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap(); assert!(exists, "key not found {}", key); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 3dd208db1..7cb30f0c9 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -55,7 +55,9 @@ use crate::{ json::json_quote, json::json_remove, json::json_set, json::json_type, }; use crate::{info, CheckpointStatus}; -use crate::{resolve_ext_path, Connection, MvStore, Result, TransactionState, DATABASE_VERSION}; +use crate::{ + resolve_ext_path, Connection, MvCursor, MvStore, Result, TransactionState, DATABASE_VERSION, +}; use insn::{ exec_add, exec_and, exec_bit_and, exec_bit_not, exec_bit_or, exec_boolean_not, exec_concat, exec_divide, exec_multiply, exec_or, exec_remainder, exec_shift_left, exec_shift_right, @@ -762,7 +764,18 @@ impl Program { root_page, } => { let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap(); - let cursor = BTreeCursor::new(pager.clone(), *root_page); + let mv_cursor = match state.mv_tx_id { + Some(tx_id) => { + let table_id = *root_page as u64; + let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_cursor = Rc::new(RefCell::new( + MvCursor::new(mv_store, tx_id, table_id).unwrap(), + )); + Some(mv_cursor) + } + None => None, + }; + let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page); let mut cursors = state.cursors.borrow_mut(); match cursor_type { CursorType::BTreeTable(_) => { @@ -2955,7 +2968,18 @@ impl Program { let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap(); let mut cursors = state.cursors.borrow_mut(); let is_index = cursor_type.is_index(); - let cursor = BTreeCursor::new(pager.clone(), *root_page); + let mv_cursor = match state.mv_tx_id { + Some(tx_id) => { + let table_id = *root_page as u64; + let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_cursor = Rc::new(RefCell::new( + MvCursor::new(mv_store, tx_id, table_id).unwrap(), + )); + Some(mv_cursor) + } + None => None, + }; + let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page); if is_index { cursors .get_mut(*cursor_id)