From bc05497d99b6f1dbb6b656383db914a884835a8d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 13 Oct 2025 19:26:18 +0200 Subject: [PATCH] core/mvcc: implement CursorTrait on MVCC cursor --- core/mvcc/cursor.rs | 326 +++++++++++++++++++++++++----------- core/mvcc/database/tests.rs | 72 ++++++-- core/storage/btree.rs | 78 ++++----- 3 files changed, 315 insertions(+), 161 deletions(-) diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 0709b2641..f177181df 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -1,8 +1,10 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::database::{MVTableId, MvStore, Row, RowID}; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult}; +use crate::storage::btree::{BTreeKey, CursorTrait}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult}; use crate::Result; use crate::{Pager, Value}; +use std::cell::{Ref, RefCell}; use std::fmt::Debug; use std::ops::Bound; use std::sync::Arc; @@ -19,12 +21,14 @@ enum CursorPosition { #[derive(Debug)] pub struct MvccLazyCursor { pub db: Arc>, - current_pos: CursorPosition, + current_pos: RefCell, pub table_id: MVTableId, tx_id: u64, + /// Reusable immutable record, used to allow better allocation strategy. + reusable_immutable_record: RefCell>, } -impl MvccLazyCursor { +impl MvccLazyCursor { pub fn new( db: Arc>, tx_id: u64, @@ -36,55 +40,15 @@ impl MvccLazyCursor { let cursor = Self { db, tx_id, - current_pos: CursorPosition::BeforeFirst, + current_pos: RefCell::new(CursorPosition::BeforeFirst), table_id, + reusable_immutable_record: RefCell::new(None), }; Ok(cursor) } - /// Insert a row into the table. - /// Sets the cursor to the inserted row. - pub fn insert(&mut self, row: Row) -> Result<()> { - self.current_pos = CursorPosition::Loaded(row.id); - if self.db.read(self.tx_id, row.id)?.is_some() { - self.db.update(self.tx_id, row).inspect_err(|_| { - self.current_pos = CursorPosition::BeforeFirst; - })?; - } else { - self.db.insert(self.tx_id, row).inspect_err(|_| { - self.current_pos = CursorPosition::BeforeFirst; - })?; - } - Ok(()) - } - - pub fn delete(&mut self, rowid: RowID) -> Result<()> { - self.db.delete(self.tx_id, rowid)?; - Ok(()) - } - - pub fn current_row_id(&mut self) -> Option { - match self.current_pos { - CursorPosition::Loaded(id) => Some(id), - CursorPosition::BeforeFirst => { - // If we are before first, we need to try and find the first row. - let maybe_rowid = - self.db - .get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id); - if let Some(id) = maybe_rowid { - self.current_pos = CursorPosition::Loaded(id); - Some(id) - } else { - self.current_pos = CursorPosition::BeforeFirst; - None - } - } - CursorPosition::End => None, - } - } - - pub fn current_row(&mut self) -> Result> { - match self.current_pos { + pub fn current_row(&self) -> Result> { + match *self.current_pos.borrow() { CursorPosition::Loaded(id) => self.db.read(self.tx_id, id), CursorPosition::BeforeFirst => { // If we are before first, we need to try and find the first row. @@ -92,7 +56,7 @@ impl MvccLazyCursor { self.db .get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id); if let Some(id) = maybe_rowid { - self.current_pos = CursorPosition::Loaded(id); + self.current_pos.replace(CursorPosition::Loaded(id)); self.db.read(self.tx_id, id) } else { Ok(None) @@ -106,19 +70,57 @@ impl MvccLazyCursor { Ok(()) } + pub fn get_next_rowid(&mut self) -> i64 { + let _ = self.last(); + match *self.current_pos.borrow() { + CursorPosition::Loaded(id) => id.row_id + 1, + CursorPosition::BeforeFirst => 1, + CursorPosition::End => i64::MAX, + } + } + + fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option> { + let mut reusable_immutable_record = self.reusable_immutable_record.borrow_mut(); + if reusable_immutable_record.is_none() { + let record = ImmutableRecord::new(1024); + reusable_immutable_record.replace(record); + } + reusable_immutable_record + } + + fn get_current_pos(&self) -> CursorPosition { + *self.current_pos.borrow() + } +} + +impl CursorTrait for MvccLazyCursor { + fn last(&mut self) -> Result> { + let last_rowid = self.db.get_last_rowid(self.table_id); + if let Some(last_rowid) = last_rowid { + self.current_pos.replace(CursorPosition::Loaded(RowID { + table_id: self.table_id, + row_id: last_rowid, + })); + } else { + self.current_pos.replace(CursorPosition::BeforeFirst); + } + Ok(IOResult::Done(())) + } + /// Move the cursor to the next row. Returns true if the cursor moved to the next row, false if the cursor is at the end of the table. - pub fn forward(&mut self) -> bool { - let before_first = matches!(self.current_pos, CursorPosition::BeforeFirst); - let min_id = match self.current_pos { + fn next(&mut self) -> Result> { + let before_first = matches!(self.get_current_pos(), CursorPosition::BeforeFirst); + let min_id = match *self.current_pos.borrow() { CursorPosition::Loaded(id) => id.row_id + 1, // TODO: do we need to forward twice? CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id, CursorPosition::End => { // let's keep same state, we reached the end so no point in moving forward. - return false; + return Ok(IOResult::Done(false)); } }; - self.current_pos = + + let new_position = match self .db .get_next_row_id_for_table(self.table_id, min_id, self.tx_id) @@ -134,46 +136,59 @@ impl MvccLazyCursor { } } }; - matches!(self.current_pos, CursorPosition::Loaded(_)) + self.current_pos.replace(new_position); + + Ok(IOResult::Done(matches!( + self.get_current_pos(), + CursorPosition::Loaded(_) + ))) } - /// Returns true if the is not pointing to any row. - pub fn is_empty(&self) -> bool { - // If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table. - // If we have loaded a row, it means there is something in the table. - match self.current_pos { - CursorPosition::Loaded(_) => false, - CursorPosition::BeforeFirst => true, - CursorPosition::End => true, + fn prev(&mut self) -> Result> { + todo!() + } + + fn rowid(&self) -> Result>> { + let rowid = match self.get_current_pos() { + CursorPosition::Loaded(id) => Some(id.row_id), + CursorPosition::BeforeFirst => { + // If we are before first, we need to try and find the first row. + let maybe_rowid = + self.db + .get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id); + if let Some(id) = maybe_rowid { + self.current_pos.replace(CursorPosition::Loaded(id)); + Some(id.row_id) + } else { + self.current_pos.replace(CursorPosition::BeforeFirst); + None + } + } + CursorPosition::End => None, + }; + Ok(IOResult::Done(rowid)) + } + + fn record( + &self, + ) -> Result>>> { + let Some(row) = self.current_row()? else { + return Ok(IOResult::Done(None)); + }; + + { + let mut record = self.get_immutable_record_or_create(); + let record = record.as_mut().unwrap(); + record.invalidate(); + record.start_serialization(&row.data); } + + let record_ref = + Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| opt.as_ref()).unwrap(); + Ok(IOResult::Done(Some(record_ref))) } - pub fn rewind(&mut self) { - self.current_pos = CursorPosition::BeforeFirst; - } - - pub fn last(&mut self) { - let last_rowid = self.db.get_last_rowid(self.table_id); - if let Some(last_rowid) = last_rowid { - self.current_pos = CursorPosition::Loaded(RowID { - table_id: self.table_id, - row_id: last_rowid, - }); - } else { - self.current_pos = CursorPosition::BeforeFirst; - } - } - - pub fn get_next_rowid(&mut self) -> i64 { - self.last(); - match self.current_pos { - CursorPosition::Loaded(id) => id.row_id + 1, - CursorPosition::BeforeFirst => 1, - CursorPosition::End => i64::MAX, - } - } - - pub fn seek(&mut self, seek_key: SeekKey<'_>, op: SeekOp) -> Result> { + fn seek(&mut self, seek_key: SeekKey<'_>, op: SeekOp) -> Result> { let row_id = match seek_key { SeekKey::TableRowId(row_id) => row_id, SeekKey::IndexKey(_) => { @@ -196,7 +211,7 @@ impl MvccLazyCursor { }; let rowid = self.db.seek_rowid(bound, lower_bound, self.tx_id); if let Some(rowid) = rowid { - self.current_pos = CursorPosition::Loaded(rowid); + self.current_pos.replace(CursorPosition::Loaded(rowid)); if op.eq_only() { if rowid.row_id == row_id { Ok(IOResult::Done(SeekResult::Found)) @@ -209,15 +224,59 @@ impl MvccLazyCursor { } else { let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT); if forwards { - self.last(); + let _ = self.last()?; } else { - self.rewind(); + let _ = self.rewind()?; } Ok(IOResult::Done(SeekResult::NotFound)) } } - pub fn exists(&mut self, key: &Value) -> Result> { + /// Insert a row into the table. + /// Sets the cursor to the inserted row. + fn insert(&mut self, key: &BTreeKey) -> Result> { + let Some(rowid) = key.maybe_rowid() else { + todo!() + }; + let row_id = RowID::new(self.table_id, rowid); + let record_buf = key.get_record().unwrap().get_payload().to_vec(); + let num_columns = match key { + BTreeKey::IndexKey(record) => record.column_count(), + BTreeKey::TableRowId((_, record)) => record.as_ref().unwrap().column_count(), + }; + let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns); + + self.current_pos.replace(CursorPosition::Loaded(row.id)); + if self.db.read(self.tx_id, row.id)?.is_some() { + self.db.update(self.tx_id, row).inspect_err(|_| { + self.current_pos.replace(CursorPosition::BeforeFirst); + })?; + } else { + self.db.insert(self.tx_id, row).inspect_err(|_| { + self.current_pos.replace(CursorPosition::BeforeFirst); + })?; + } + Ok(IOResult::Done(())) + } + + fn delete(&mut self) -> Result> { + let IOResult::Done(Some(rowid)) = self.rowid()? else { + todo!(); + }; + let rowid = RowID::new(self.table_id, rowid); + self.db.delete(self.tx_id, rowid)?; + Ok(IOResult::Done(())) + } + + fn set_null_flag(&mut self, _flag: bool) { + todo!() + } + + fn get_null_flag(&self) -> bool { + todo!() + } + + fn exists(&mut self, key: &Value) -> Result> { let int_key = match key { Value::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), @@ -234,11 +293,90 @@ impl MvccLazyCursor { ) .is_some(); if exists { - self.current_pos = CursorPosition::Loaded(RowID { + self.current_pos.replace(CursorPosition::Loaded(RowID { table_id: self.table_id, row_id: *int_key, - }); + })); } Ok(IOResult::Done(exists)) } + + fn clear_btree(&mut self) -> Result>> { + todo!() + } + + fn btree_destroy(&mut self) -> Result>> { + todo!() + } + + fn count(&mut self) -> Result> { + todo!() + } + + /// Returns true if the is not pointing to any row. + fn is_empty(&self) -> bool { + // If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table. + // If we have loaded a row, it means there is something in the table. + match self.get_current_pos() { + CursorPosition::Loaded(_) => false, + CursorPosition::BeforeFirst => true, + CursorPosition::End => true, + } + } + + fn root_page(&self) -> i64 { + self.table_id.into() + } + + fn rewind(&mut self) -> Result> { + self.current_pos.replace(CursorPosition::BeforeFirst); + Ok(IOResult::Done(())) + } + + fn has_record(&self) -> bool { + todo!() + } + + fn set_has_record(&self, _has_record: bool) { + todo!() + } + + fn get_index_info(&self) -> &crate::types::IndexInfo { + todo!() + } + + fn seek_end(&mut self) -> Result> { + todo!() + } + + fn seek_to_last(&mut self) -> Result> { + todo!() + } + + fn invalidate_record(&mut self) { + self.get_immutable_record_or_create() + .as_mut() + .unwrap() + .invalidate(); + } + + fn has_rowid(&self) -> bool { + todo!() + } + + fn record_cursor_mut(&self) -> std::cell::RefMut<'_, crate::types::RecordCursor> { + todo!() + } + + fn get_pager(&self) -> Arc { + todo!() + } + + fn get_skip_advance(&self) -> bool { + todo!() + } + + fn get_mvcc_cursor(&self) -> Arc> { + todo!() + } } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 086391064..9198c5825 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -115,6 +115,10 @@ pub(crate) fn generate_simple_string_row(table_id: MVTableId, id: i64, data: &st } } +pub(crate) fn generate_simple_string_record(data: &str) -> ImmutableRecord { + ImmutableRecord::from_values(&[Value::Text(Text::new(data))], 1) +} + #[test] fn test_insert_read() { let db = MvccTestDb::new(); @@ -830,14 +834,21 @@ fn test_lazy_scan_cursor_basic() { .unwrap(); // Check first row - assert!(cursor.forward()); + assert!(matches!(cursor.next().unwrap(), IOResult::Done(true))); assert!(!cursor.is_empty()); let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 1); // Iterate through all rows let mut count = 1; - while cursor.forward() { + loop { + let res = cursor.next().unwrap(); + let IOResult::Done(res) = res else { + panic!("unexpected next result {res:?}"); + }; + if !res { + break; + } count += 1; let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, count); @@ -847,7 +858,7 @@ fn test_lazy_scan_cursor_basic() { assert_eq!(count, 5); // After the last row, is_empty should return true - assert!(!cursor.forward()); + assert!(!matches!(cursor.next().unwrap(), IOResult::Done(true))); assert!(cursor.is_empty()); } @@ -865,7 +876,7 @@ fn test_lazy_scan_cursor_with_gaps() { .unwrap(); // Check first row - assert!(cursor.forward()); + assert!(matches!(cursor.next().unwrap(), IOResult::Done(true))); assert!(!cursor.is_empty()); let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 5); @@ -874,12 +885,27 @@ fn test_lazy_scan_cursor_with_gaps() { let expected_ids = [5, 10, 15, 20, 30]; let mut index = 0; - assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]); + let IOResult::Done(rowid) = cursor.rowid().unwrap() else { + unreachable!(); + }; + let rowid = rowid.unwrap(); + assert_eq!(rowid, expected_ids[index]); - while cursor.forward() { + loop { + let res = cursor.next().unwrap(); + let IOResult::Done(res) = res else { + panic!("unexpected next result {res:?}"); + }; + if !res { + break; + } index += 1; if index < expected_ids.len() { - assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]); + let IOResult::Done(rowid) = cursor.rowid().unwrap() else { + unreachable!(); + }; + let rowid = rowid.unwrap(); + assert_eq!(rowid, expected_ids[index]); } } @@ -900,7 +926,7 @@ fn test_cursor_basic() { ) .unwrap(); - cursor.forward(); + let _ = cursor.next().unwrap(); // Check first row assert!(!cursor.is_empty()); @@ -909,7 +935,14 @@ fn test_cursor_basic() { // Iterate through all rows let mut count = 1; - while cursor.forward() { + loop { + let res = cursor.next().unwrap(); + let IOResult::Done(res) = res else { + panic!("unexpected next result {res:?}"); + }; + if !res { + break; + } count += 1; let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, count); @@ -919,7 +952,7 @@ fn test_cursor_basic() { assert_eq!(count, 5); // After the last row, is_empty should return true - assert!(!cursor.forward()); + assert!(!matches!(cursor.next().unwrap(), IOResult::Done(true))); assert!(cursor.is_empty()); } @@ -939,7 +972,7 @@ fn test_cursor_with_empty_table() { let table_id = -1; // Empty table // Test LazyScanCursor with empty table - let mut cursor = MvccLazyCursor::new( + let cursor = MvccLazyCursor::new( db.mvcc_store.clone(), tx_id, table_id, @@ -947,7 +980,8 @@ fn test_cursor_with_empty_table() { ) .unwrap(); assert!(cursor.is_empty()); - assert!(cursor.current_row_id().is_none()); + let rowid = cursor.rowid().unwrap(); + assert!(matches!(rowid, IOResult::Done(None))); } #[test] @@ -964,15 +998,17 @@ fn test_cursor_modification_during_scan() { .unwrap(); // Read first row - assert!(cursor.forward()); + assert!(matches!(cursor.next().unwrap(), IOResult::Done(true))); let first_row = cursor.current_row().unwrap().unwrap(); assert_eq!(first_row.id.row_id, 1); // Insert a new row with ID between existing rows let new_row_id = RowID::new(table_id.into(), 3); - let new_row = generate_simple_string_row(table_id.into(), new_row_id.row_id, "new_row"); + let new_row = generate_simple_string_record("new_row"); - cursor.insert(new_row).unwrap(); + let _ = cursor + .insert(&BTreeKey::TableRowId((new_row_id.row_id, Some(&new_row)))) + .unwrap(); let row = db.mvcc_store.read(tx_id, new_row_id).unwrap().unwrap(); let mut record = ImmutableRecord::new(1024); record.start_serialization(&row.data); @@ -986,7 +1022,7 @@ fn test_cursor_modification_during_scan() { assert_eq!(row.id.row_id, 3); // Continue scanning - the cursor should still work correctly - cursor.forward(); // Move to 4 + let _ = cursor.next().unwrap(); // Move to 4 let row = db .mvcc_store .read(tx_id, RowID::new(table_id.into(), 4)) @@ -994,14 +1030,14 @@ fn test_cursor_modification_during_scan() { .unwrap(); assert_eq!(row.id.row_id, 4); - cursor.forward(); // Move to 5 (our new row) + let _ = cursor.next().unwrap(); // Move to 5 (our new row) let row = db .mvcc_store .read(tx_id, RowID::new(table_id.into(), 5)) .unwrap() .unwrap(); assert_eq!(row.id.row_id, 5); - assert!(!cursor.forward()); + assert!(!matches!(cursor.next().unwrap(), IOResult::Done(true))); assert!(cursor.is_empty()); } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0b7489530..633c97599 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -337,7 +337,7 @@ impl BTreeKey<'_> { } /// Get the record, if present. Index will always be present, - fn get_record(&self) -> Option<&'_ ImmutableRecord> { + pub fn get_record(&self) -> Option<&'_ ImmutableRecord> { match self { BTreeKey::TableRowId((_, record)) => *record, BTreeKey::IndexKey(record) => Some(record), @@ -345,7 +345,7 @@ impl BTreeKey<'_> { } /// Get the rowid, if present. Index will never be present. - fn maybe_rowid(&self) -> Option { + pub fn maybe_rowid(&self) -> Option { match self { BTreeKey::TableRowId((rowid, _)) => Some(*rowid), BTreeKey::IndexKey(_) => None, @@ -1297,8 +1297,10 @@ impl BTreeCursor { pub fn get_next_record(&mut self) -> Result> { if let Some(mv_cursor) = &self.mv_cursor { let mut mv_cursor = mv_cursor.write(); - mv_cursor.forward(); - let rowid = mv_cursor.current_row_id(); + assert!(matches!(mv_cursor.next()?, IOResult::Done(_))); + let IOResult::Done(rowid) = mv_cursor.rowid()? else { + todo!() + }; match rowid { Some(_rowid) => { return Ok(IOResult::Done(true)); @@ -4453,11 +4455,14 @@ impl BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] pub fn rowid(&self) -> Result>> { if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); - let Some(rowid) = mv_cursor.current_row_id() else { + let mv_cursor = mv_cursor.write(); + let IOResult::Done(rowid) = mv_cursor.rowid()? else { + todo!() + }; + let Some(rowid) = rowid else { return Ok(IOResult::Done(None)); }; - return Ok(IOResult::Done(Some(rowid.row_id))); + return Ok(IOResult::Done(Some(rowid))); } if self.get_null_flag() { return Ok(IOResult::Done(None)); @@ -4520,7 +4525,7 @@ impl BTreeCursor { return Ok(IOResult::Done(Some(record_ref))); } if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); + let mv_cursor = mv_cursor.write(); let Some(row) = mv_cursor.current_row()? else { return Ok(IOResult::Done(None)); }; @@ -4586,22 +4591,9 @@ impl BTreeCursor { pub fn insert(&mut self, key: &BTreeKey) -> Result> { tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress()); match &self.mv_cursor { - Some(mv_cursor) => match key.maybe_rowid() { - Some(rowid) => { - let row_id = - crate::mvcc::database::RowID::new(mv_cursor.read().table_id, rowid); - let record_buf = key.get_record().unwrap().get_payload().to_vec(); - let num_columns = match key { - BTreeKey::IndexKey(record) => record.column_count(), - BTreeKey::TableRowId((_, record)) => { - record.as_ref().unwrap().column_count() - } - }; - let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns); - mv_cursor.write().insert(row)?; - } - None => todo!("Support mvcc inserts with index btrees"), - }, + Some(mv_cursor) => { + return_if_io!(mv_cursor.write().insert(key)); + } None => { return_if_io!(self.insert_into_page(key)); if key.maybe_rowid().is_some() { @@ -4627,8 +4619,7 @@ impl BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] pub fn delete(&mut self) -> Result> { if let Some(mv_cursor) = &self.mv_cursor { - let rowid = mv_cursor.write().current_row_id().unwrap(); - mv_cursor.write().delete(rowid)?; + return_if_io!(mv_cursor.write().delete()); return Ok(IOResult::Done(())); } @@ -5679,11 +5670,14 @@ impl CursorTrait for BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] fn rowid(&self) -> Result>> { if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); - let Some(rowid) = mv_cursor.current_row_id() else { + let mv_cursor = mv_cursor.write(); + let IOResult::Done(rowid) = mv_cursor.rowid()? else { + todo!(); + }; + let Some(rowid) = rowid else { return Ok(IOResult::Done(None)); }; - return Ok(IOResult::Done(Some(rowid.row_id))); + return Ok(IOResult::Done(Some(rowid))); } if self.get_null_flag() { return Ok(IOResult::Done(None)); @@ -5743,7 +5737,7 @@ impl CursorTrait for BTreeCursor { return Ok(IOResult::Done(Some(record_ref))); } if let Some(mv_cursor) = &self.mv_cursor { - let mut mv_cursor = mv_cursor.write(); + let mv_cursor = mv_cursor.write(); let Some(row) = mv_cursor.current_row()? else { return Ok(IOResult::Done(None)); }; @@ -5809,22 +5803,9 @@ impl CursorTrait for BTreeCursor { fn insert(&mut self, key: &BTreeKey) -> Result> { tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress()); match &self.mv_cursor { - Some(mv_cursor) => match key.maybe_rowid() { - Some(rowid) => { - let row_id = - crate::mvcc::database::RowID::new(mv_cursor.read().table_id, rowid); - let record_buf = key.get_record().unwrap().get_payload().to_vec(); - let num_columns = match key { - BTreeKey::IndexKey(record) => record.column_count(), - BTreeKey::TableRowId((_, record)) => { - record.as_ref().unwrap().column_count() - } - }; - let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns); - mv_cursor.write().insert(row)?; - } - None => todo!("Support mvcc inserts with index btrees"), - }, + Some(mv_cursor) => { + return_if_io!(mv_cursor.write().insert(key)); + } None => { return_if_io!(self.insert_into_page(key)); if key.maybe_rowid().is_some() { @@ -5838,8 +5819,7 @@ impl CursorTrait for BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] fn delete(&mut self) -> Result> { if let Some(mv_cursor) = &self.mv_cursor { - let rowid = mv_cursor.write().current_row_id().unwrap(); - mv_cursor.write().delete(rowid)?; + return_if_io!(mv_cursor.write().delete()); return Ok(IOResult::Done(())); } @@ -6361,7 +6341,7 @@ impl CursorTrait for BTreeCursor { self.rewind_state = RewindState::NextRecord; if let Some(mv_cursor) = &self.mv_cursor { let mut mv_cursor = mv_cursor.write(); - mv_cursor.rewind(); + return_if_io!(mv_cursor.rewind()); } else { let c = self.move_to_root()?; if let Some(c) = c {