use parking_lot::RwLock; use crate::mvcc::clock::LogicalClock; use crate::mvcc::database::{MVTableId, MvStore, Row, RowID, RowVersionState}; use crate::storage::btree::{BTreeCursor, BTreeKey, CursorTrait}; use crate::translate::plan::IterationDirection; use crate::types::{IOResult, ImmutableRecord, RecordCursor, SeekKey, SeekOp, SeekResult}; use crate::{return_if_io, Result}; use crate::{Pager, Value}; use std::any::Any; use std::cell::{Ref, RefCell}; use std::fmt::Debug; use std::ops::Bound; use std::sync::Arc; #[derive(Debug, Copy, Clone)] enum CursorPosition { /// We haven't loaded any row yet. BeforeFirst, /// We have loaded a row. This position points to a rowid in either MVCC index or in BTree. Loaded { row_id: RowID, /// Indicates whether the rowid is pointing BTreeCursor or MVCC index. in_btree: bool, /// Indicates whether the rowid poiting in BTreeCursor has been consumed. /// This is required to know whether `next` or `prev` has been called but not yet consumed. btree_consumed: bool, }, /// We have reached the end of the table. End, } #[derive(Debug, Clone, Copy)] enum NextState { NextBtree { new_position_in_mvcc: CursorPosition, }, } #[derive(Debug, Clone, Copy)] enum PrevState { PrevBtree { new_position_in_mvcc: CursorPosition, }, } #[derive(Debug, Clone, Copy)] enum MvccLazyCursorState { Next(NextState), Prev(PrevState), } pub struct MvccLazyCursor { pub db: Arc>, current_pos: RefCell, pub table_id: MVTableId, tx_id: u64, /// Reusable immutable record, used to allow better allocation strategy. reusable_immutable_record: RefCell>, btree_cursor: Box, null_flag: bool, record_cursor: RefCell, next_rowid_lock: Arc>, state: RefCell>, } impl MvccLazyCursor { pub fn new( db: Arc>, tx_id: u64, root_page_or_table_id: i64, btree_cursor: Box, ) -> Result> { assert!( (&*btree_cursor as &dyn Any).is::(), "BTreeCursor expected for mvcc cursor" ); let table_id = db.get_table_id_from_root_page(root_page_or_table_id); Ok(Self { db, tx_id, current_pos: RefCell::new(CursorPosition::BeforeFirst), table_id, reusable_immutable_record: RefCell::new(None), btree_cursor, null_flag: false, record_cursor: RefCell::new(RecordCursor::new()), next_rowid_lock: Arc::new(RwLock::new(())), state: RefCell::new(None), }) } /// Returns the current row as an immutable record. pub fn current_row( &self, ) -> Result>>> { match *self.current_pos.borrow() { CursorPosition::Loaded { row_id: _, in_btree, btree_consumed: _, } => { if in_btree { let IOResult::Done(Some(record)) = self.btree_cursor.record()? else { panic!("BTree should have returned record"); }; Ok(IOResult::Done(Some(record))) } else { let Some(row) = self.read_mvcc_current_row()? else { return Ok(IOResult::Done(None)); }; { let mut record = self.get_immutable_record_or_create(); let record = record.as_mut().unwrap(); record.invalidate(); record.start_serialization(&row.data); } let record_ref = Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| { opt.as_ref() }) .unwrap(); Ok(IOResult::Done(Some(record_ref))) } } CursorPosition::BeforeFirst => { // Before first is not a valid position, so we return none. Ok(IOResult::Done(None)) } CursorPosition::End => Ok(IOResult::Done(None)), } } pub fn read_mvcc_current_row(&self) -> Result> { let row_id = match *self.current_pos.borrow() { CursorPosition::Loaded { row_id, in_btree, btree_consumed: _, } if !in_btree => row_id, _ => panic!("invalid position to read current mvcc row"), }; self.db.read(self.tx_id, row_id) } pub fn close(self) -> Result<()> { Ok(()) } pub fn get_next_rowid(&mut self) -> i64 { // lock so we don't get same two rowids let lock = self.next_rowid_lock.clone(); let _lock = lock.write(); let _ = self.last(); match *self.current_pos.borrow() { CursorPosition::Loaded { row_id, in_btree: _, btree_consumed: _, } => row_id.row_id + 1, CursorPosition::BeforeFirst => 1, CursorPosition::End => 1, } } fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option> { let mut reusable_immutable_record = self.reusable_immutable_record.borrow_mut(); if reusable_immutable_record.is_none() { let record = ImmutableRecord::new(1024); reusable_immutable_record.replace(record); } reusable_immutable_record } fn get_current_pos(&self) -> CursorPosition { *self.current_pos.borrow() } /// Returns the new position of the cursor based on the new position in MVCC and the current rowid in BTree. /// If we are moving forwards -> choose smaller rowid (mvcc if mvcc < btree, else btree) /// If we are moving backwards -> choose larger rowid (mvcc if mvcc > btree, else btree) fn get_new_position_from_mvcc_and_btree( &mut self, new_position_in_mvcc: &Option, current_rowid_in_btree: &Option, direction: IterationDirection, ) -> CursorPosition { tracing::trace!("get_new_position_from_mvcc_and_btree(new_position_in_mvcc={:?}, current_rowid_in_btree={:?}, direction={:?})", new_position_in_mvcc, current_rowid_in_btree, direction); match (new_position_in_mvcc, current_rowid_in_btree) { (Some(mvcc_rowid), Some(btree_rowid)) => { // When forwards: choose smaller rowid (mvcc if mvcc < btree, else btree) // When backwards: choose larger rowid (mvcc if mvcc > btree, else btree) let use_mvcc = if direction == IterationDirection::Forwards { mvcc_rowid < btree_rowid } else { mvcc_rowid > btree_rowid }; let (row_id, in_btree, btree_consumed) = if use_mvcc { (*mvcc_rowid, false, false) } else { (*btree_rowid, true, true) }; CursorPosition::Loaded { row_id: RowID { table_id: self.table_id, row_id, }, in_btree, btree_consumed, } } (None, Some(btree_rowid)) => CursorPosition::Loaded { row_id: RowID { table_id: self.table_id, row_id: *btree_rowid, }, in_btree: true, btree_consumed: true, }, (Some(mvcc_rowid), None) => CursorPosition::Loaded { row_id: RowID { table_id: self.table_id, row_id: *mvcc_rowid, }, in_btree: false, btree_consumed: true, }, (None, None) => CursorPosition::End, } } fn is_btree_allocated(&self) -> bool { let maybe_root_page = self.db.table_id_to_rootpage.get(&self.table_id); maybe_root_page.is_some_and(|entry| entry.value().is_some()) } fn query_btree_version_is_valid(&self, row_id: i64) -> bool { // If the row is not found in MVCC index, this means row_id is valid in btree matches!( self.db .find_row_last_version_state(self.table_id, row_id, self.tx_id), RowVersionState::NotFound ) } } impl CursorTrait for MvccLazyCursor { fn last(&mut self) -> Result> { if self.is_btree_allocated() { return_if_io!(self.btree_cursor.last()); } self.invalidate_record(); self.current_pos.replace(CursorPosition::End); let position_in_mvcc = self.db.get_last_rowid(self.table_id); let position_in_btree = if self.is_btree_allocated() { let IOResult::Done(maybe_rowid) = self.btree_cursor.rowid()? else { panic!("BTree should have returned rowid after last"); }; maybe_rowid } else { None }; let new_position = self.get_new_position_from_mvcc_and_btree( &position_in_mvcc, &position_in_btree, IterationDirection::Backwards, ); self.current_pos.replace(new_position); self.invalidate_record(); Ok(IOResult::Done(())) } /// Move the cursor to the next row. Returns true if the cursor moved to the next row, false if the cursor is at the end of the table. fn next(&mut self) -> Result> { let current_state = *self.state.borrow(); if current_state.is_none() { let before_first = matches!(self.get_current_pos(), CursorPosition::BeforeFirst); let min_id = match *self.current_pos.borrow() { CursorPosition::Loaded { row_id, in_btree: _, btree_consumed: _, } => row_id.row_id + 1, // TODO: do we need to forward twice? CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id, CursorPosition::End => { // let's keep same state, we reached the end so no point in moving forward. return Ok(IOResult::Done(false)); } }; let new_position_in_mvcc = match self .db .get_next_row_id_for_table(self.table_id, min_id, self.tx_id) { Some(id) => CursorPosition::Loaded { row_id: id, in_btree: false, btree_consumed: false, }, None => { if before_first { // if it wasn't loaded and we didn't find anything, it means the table is empty. CursorPosition::BeforeFirst } else { // if we had something loaded, and we didn't find next key then it means we are at the end. CursorPosition::End } } }; self.state .replace(Some(MvccLazyCursorState::Next(NextState::NextBtree { new_position_in_mvcc, }))); } // Now we need to loop for next rowid in btree that is valid. // FIXME: this is quite unperformant, we should find a better way to do this. loop { let current_state = *self.state.borrow(); let Some(MvccLazyCursorState::Next(NextState::NextBtree { new_position_in_mvcc, })) = current_state else { panic!("Invalid state {:?}", self.state.borrow()); }; // Check whether we have already consumed the rowid in btree. In BeforeFirst we can assume we haven't started calling next yet. // In End we can assume we have already called next and it returned false, so we can assume we have consumed the rowid. let btree_consumed = match self.get_current_pos() { CursorPosition::Loaded { row_id: _, in_btree: _, btree_consumed, } => btree_consumed, CursorPosition::BeforeFirst => true, CursorPosition::End => true, }; let found = if self.is_btree_allocated() { // If we have a functional btree, let's either find next value, or use the one pointed at by the cursor. if btree_consumed { return_if_io!(self.btree_cursor.next()) } else { true } } else { // If we don't have a functional btree, we can't find next value, so we return false. false }; // get current rowid in mvcc and in btree // compare both and set loaded to position of the one that is lesser let new_position_in_mvcc = match new_position_in_mvcc { CursorPosition::Loaded { row_id, in_btree: _, btree_consumed: _, } => Some(row_id.row_id), CursorPosition::BeforeFirst => None, CursorPosition::End => None, }; let current_rowid_in_btree = if found { let IOResult::Done(Some(rowid)) = self.btree_cursor.rowid()? else { panic!("BTree should have returned rowid after next"); }; if self.query_btree_version_is_valid(rowid) { Some(rowid) } else { // if the row is not valid, we need to continue to the next rowid in btree. // We first set consumed to true so that next time we call next, we don't use the same rowid. if let CursorPosition::Loaded { btree_consumed, .. } = &mut *self.current_pos.borrow_mut() { *btree_consumed = true; } continue; } } else { None }; let new_position = self.get_new_position_from_mvcc_and_btree( &new_position_in_mvcc, ¤t_rowid_in_btree, IterationDirection::Forwards, ); self.current_pos.replace(new_position); self.invalidate_record(); self.state.replace(None); return Ok(IOResult::Done(matches!( self.get_current_pos(), CursorPosition::Loaded { .. } ))); } } fn prev(&mut self) -> Result> { let current_state = *self.state.borrow(); if current_state.is_none() { let max_id = match *self.current_pos.borrow() { CursorPosition::Loaded { row_id, in_btree: _, btree_consumed: _, } => row_id.row_id, CursorPosition::BeforeFirst => { return Ok(IOResult::Done(false)); } CursorPosition::End => { i64::MAX // we need to find last row, so we look from the last id, } }; let new_position_in_mvcc = match self .db .get_prev_row_id_for_table(self.table_id, max_id, self.tx_id) { Some(id) => CursorPosition::Loaded { row_id: id, in_btree: false, btree_consumed: false, }, None => CursorPosition::BeforeFirst, }; self.state .replace(Some(MvccLazyCursorState::Prev(PrevState::PrevBtree { new_position_in_mvcc, }))); } // Now we need to loop for prev rowid in btree that is valid. // FIXME: this is quite unperformant, we should find a better way to do this. loop { let current_state = *self.state.borrow(); let Some(MvccLazyCursorState::Prev(PrevState::PrevBtree { new_position_in_mvcc, })) = current_state else { panic!("Invalid state {:?}", self.state.borrow()); }; // Check whether we have already consumed the rowid in btree. In BeforeFirst we can assume we haven't started calling next yet. // In End we can assume we have already called next and it returned false, so we can assume we have consumed the rowid. let btree_consumed = match self.get_current_pos() { CursorPosition::Loaded { row_id: _, in_btree: _, btree_consumed, } => btree_consumed, CursorPosition::BeforeFirst => true, CursorPosition::End => true, }; let found = if self.is_btree_allocated() { // If we have a functional btree, let's either find next value, or use the one pointed at by the cursor. if btree_consumed { return_if_io!(self.btree_cursor.prev()) } else { true } } else { // If we don't have a functional btree, we can't find next value, so we return false. false }; // get current rowid in mvcc and in btree let maybe_rowid_mvcc = match new_position_in_mvcc { CursorPosition::Loaded { row_id, in_btree: _, btree_consumed: _, } => Some(row_id.row_id), CursorPosition::BeforeFirst => None, CursorPosition::End => None, }; let maybe_rowid_in_btree = if found { let IOResult::Done(Some(rowid)) = self.btree_cursor.rowid()? else { panic!("BTree should have returned rowid after prev because we found a valid rowid after calling btree_cursor.prev()"); }; if self.query_btree_version_is_valid(rowid) { Some(rowid) } else { // if the row is not valid, we need to continue to the next rowid in btree. // We first set consumed to true so that next time we call next, we don't use the same rowid. if let CursorPosition::Loaded { btree_consumed, .. } = &mut *self.current_pos.borrow_mut() { *btree_consumed = true; } continue; } } else { None }; // Update based on direction. let new_position = self.get_new_position_from_mvcc_and_btree( &maybe_rowid_mvcc, &maybe_rowid_in_btree, IterationDirection::Backwards, ); self.current_pos.replace(new_position); self.invalidate_record(); self.state.replace(None); return Ok(IOResult::Done(matches!( self.get_current_pos(), CursorPosition::Loaded { .. } ))); } } fn rowid(&self) -> Result>> { let rowid = match self.get_current_pos() { CursorPosition::Loaded { row_id, in_btree: _, btree_consumed: _, } => Some(row_id.row_id), CursorPosition::BeforeFirst => None, CursorPosition::End => None, }; Ok(IOResult::Done(rowid)) } fn record( &self, ) -> Result>>> { self.current_row() } fn seek(&mut self, seek_key: SeekKey<'_>, op: SeekOp) -> Result> { let row_id = match seek_key { SeekKey::TableRowId(row_id) => row_id, SeekKey::IndexKey(_) => { todo!(); } }; // gt -> lower_bound bound excluded, we want first row after row_id // ge -> lower_bound bound included, we want first row equal to row_id or first row after row_id // lt -> upper_bound bound excluded, we want last row before row_id // le -> upper_bound bound included, we want last row equal to row_id or first row before row_id let rowid = RowID { table_id: self.table_id, row_id, }; let (bound, lower_bound) = match op { SeekOp::GT => (Bound::Excluded(&rowid), true), SeekOp::GE { eq_only: _ } => (Bound::Included(&rowid), true), SeekOp::LT => (Bound::Excluded(&rowid), false), SeekOp::LE { eq_only: _ } => (Bound::Included(&rowid), false), }; self.invalidate_record(); let rowid = self.db.seek_rowid(bound, lower_bound, self.tx_id); if let Some(rowid) = rowid { self.current_pos.replace(CursorPosition::Loaded { row_id: rowid, in_btree: false, btree_consumed: false, }); if op.eq_only() { if rowid.row_id == row_id { Ok(IOResult::Done(SeekResult::Found)) } else { Ok(IOResult::Done(SeekResult::NotFound)) } } else { Ok(IOResult::Done(SeekResult::Found)) } } else { let forwards = matches!(op, SeekOp::GE { eq_only: _ } | SeekOp::GT); if forwards { let _ = self.last()?; } else { let _ = self.rewind()?; } Ok(IOResult::Done(SeekResult::NotFound)) } } /// Insert a row into the table. /// Sets the cursor to the inserted row. fn insert(&mut self, key: &BTreeKey) -> Result> { let Some(rowid) = key.maybe_rowid() else { todo!() }; let row_id = RowID::new(self.table_id, rowid); let record_buf = key.get_record().unwrap().get_payload().to_vec(); let num_columns = match key { BTreeKey::IndexKey(record) => record.column_count(), BTreeKey::TableRowId((_, record)) => record.as_ref().unwrap().column_count(), }; let row = crate::mvcc::database::Row::new(row_id, record_buf, num_columns); self.current_pos.replace(CursorPosition::Loaded { row_id: row.id, in_btree: false, btree_consumed: true, }); // FIXME: set btree to somewhere close to this rowid? if self.db.read(self.tx_id, row.id)?.is_some() { self.db.update(self.tx_id, row).inspect_err(|_| { self.current_pos.replace(CursorPosition::BeforeFirst); })?; } else { self.db.insert(self.tx_id, row).inspect_err(|_| { self.current_pos.replace(CursorPosition::BeforeFirst); })?; } self.invalidate_record(); Ok(IOResult::Done(())) } fn delete(&mut self) -> Result> { let IOResult::Done(Some(rowid)) = self.rowid()? else { todo!(); }; let rowid = RowID::new(self.table_id, rowid); self.db.delete(self.tx_id, rowid)?; self.invalidate_record(); Ok(IOResult::Done(())) } fn set_null_flag(&mut self, flag: bool) { self.null_flag = flag; } fn get_null_flag(&self) -> bool { self.null_flag } fn exists(&mut self, key: &Value) -> Result> { self.invalidate_record(); let int_key = match key { Value::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), }; let rowid = self.db.seek_rowid( Bound::Included(&RowID { table_id: self.table_id, row_id: *int_key, }), true, self.tx_id, ); tracing::trace!("found {rowid:?}"); let exists = if let Some(rowid) = rowid { rowid.row_id == *int_key } else { false }; if exists { self.current_pos.replace(CursorPosition::Loaded { row_id: RowID { table_id: self.table_id, row_id: *int_key, }, in_btree: false, btree_consumed: false, }); } Ok(IOResult::Done(exists)) } fn clear_btree(&mut self) -> Result>> { todo!() } fn btree_destroy(&mut self) -> Result>> { todo!() } fn count(&mut self) -> Result> { todo!() } /// Returns true if the is not pointing to any row. fn is_empty(&self) -> bool { // If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table. // If we have loaded a row, it means there is something in the table. match self.get_current_pos() { CursorPosition::Loaded { .. } => false, CursorPosition::BeforeFirst => true, CursorPosition::End => true, } } fn root_page(&self) -> i64 { self.table_id.into() } fn rewind(&mut self) -> Result> { // First run btree_cursor rewind so that we don't need a explicit state machine. if self.is_btree_allocated() { return_if_io!(self.btree_cursor.rewind()); } self.invalidate_record(); if !matches!(self.get_current_pos(), CursorPosition::BeforeFirst) { self.current_pos.replace(CursorPosition::BeforeFirst); } let new_position_in_mvcc = self .db .get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id) .map(|id| id.row_id); let IOResult::Done(maybe_rowid_in_btree) = self.btree_cursor.rowid()? else { panic!("BTree should have returned rowid after rewind because we called btree_cursor.rewind()"); }; let maybe_rowid_in_btree = maybe_rowid_in_btree.filter(|rowid| self.query_btree_version_is_valid(*rowid)); let new_position = self.get_new_position_from_mvcc_and_btree( &new_position_in_mvcc, &maybe_rowid_in_btree, IterationDirection::Forwards, ); self.current_pos.replace(new_position); Ok(IOResult::Done(())) } fn has_record(&self) -> bool { todo!() } fn set_has_record(&self, _has_record: bool) { todo!() } fn get_index_info(&self) -> &crate::types::IndexInfo { todo!() } fn seek_end(&mut self) -> Result> { todo!() } fn seek_to_last(&mut self) -> Result> { self.invalidate_record(); let max_rowid = RowID { table_id: self.table_id, row_id: i64::MAX, }; let bound = Bound::Included(&max_rowid); let lower_bound = false; let rowid = self.db.seek_rowid(bound, lower_bound, self.tx_id); if let Some(rowid) = rowid { self.current_pos.replace(CursorPosition::Loaded { row_id: rowid, in_btree: false, btree_consumed: false, }); } else { self.current_pos.replace(CursorPosition::End); } Ok(IOResult::Done(())) } fn invalidate_record(&mut self) { self.get_immutable_record_or_create() .as_mut() .unwrap() .invalidate(); self.record_cursor.borrow_mut().invalidate(); } fn has_rowid(&self) -> bool { todo!() } fn record_cursor_mut(&self) -> std::cell::RefMut<'_, crate::types::RecordCursor> { self.record_cursor.borrow_mut() } fn get_pager(&self) -> Arc { todo!() } fn get_skip_advance(&self) -> bool { todo!() } } impl Debug for MvccLazyCursor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MvccLazyCursor") .field("current_pos", &self.current_pos) .field("table_id", &self.table_id) .field("tx_id", &self.tx_id) .field("reusable_immutable_record", &self.reusable_immutable_record) .field("btree_cursor", &()) .finish() } }