diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 256199741..42ed228b5 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -3,6 +3,7 @@ use parking_lot::RwLock; use crate::mvcc::clock::LogicalClock; use crate::mvcc::database::{MVTableId, MvStore, Row, RowID, RowVersionState}; use crate::storage::btree::{BTreeCursor, BTreeKey, CursorTrait}; +use crate::translate::plan::IterationDirection; use crate::types::{IOResult, ImmutableRecord, RecordCursor, SeekKey, SeekOp, SeekResult}; use crate::{return_if_io, Result}; use crate::{Pager, Value}; @@ -36,9 +37,16 @@ enum NextState { }, } +#[derive(Debug, Clone, Copy)] +enum PrevState { + PrevBtree { + new_position_in_mvcc: CursorPosition, + }, +} #[derive(Debug, Clone, Copy)] enum MvccLazyCursorState { Next(NextState), + Prev(PrevState), } pub struct MvccLazyCursor { @@ -151,7 +159,7 @@ impl MvccLazyCursor { btree_consumed: _, } => row_id.row_id + 1, CursorPosition::BeforeFirst => 1, - CursorPosition::End => i64::MAX, + CursorPosition::End => 1, } } @@ -169,32 +177,38 @@ impl MvccLazyCursor { } /// Returns the new position of the cursor based on the new position in MVCC and the current rowid in BTree. - /// If the new position in MVCC is less than the current rowid in BTree, the cursor will be set to the new position in MVCC. + /// If we are moving forwards -> choose smaller rowid (mvcc if mvcc < btree, else btree) + /// If we are moving backwards -> choose larger rowid (mvcc if mvcc > btree, else btree) fn get_new_position_from_mvcc_and_btree( &mut self, new_position_in_mvcc: &Option, current_rowid_in_btree: &Option, + direction: IterationDirection, ) -> CursorPosition { + tracing::trace!("get_new_position_from_mvcc_and_btree(new_position_in_mvcc={:?}, current_rowid_in_btree={:?}, direction={:?})", new_position_in_mvcc, current_rowid_in_btree, direction); match (new_position_in_mvcc, current_rowid_in_btree) { (Some(mvcc_rowid), Some(btree_rowid)) => { - if mvcc_rowid < btree_rowid { - CursorPosition::Loaded { - row_id: RowID { - table_id: self.table_id, - row_id: *mvcc_rowid, - }, - in_btree: false, - btree_consumed: false, // there is a BTree rowid being pointed but we choose not to consume it yet. - } + // When forwards: choose smaller rowid (mvcc if mvcc < btree, else btree) + // When backwards: choose larger rowid (mvcc if mvcc > btree, else btree) + let use_mvcc = if direction == IterationDirection::Forwards { + mvcc_rowid < btree_rowid } else { - CursorPosition::Loaded { - row_id: RowID { - table_id: self.table_id, - row_id: *btree_rowid, - }, - in_btree: true, - btree_consumed: true, - } + mvcc_rowid > btree_rowid + }; + + let (row_id, in_btree, btree_consumed) = if use_mvcc { + (*mvcc_rowid, false, false) + } else { + (*btree_rowid, true, true) + }; + + CursorPosition::Loaded { + row_id: RowID { + table_id: self.table_id, + row_id, + }, + in_btree, + btree_consumed, } } (None, Some(btree_rowid)) => CursorPosition::Loaded { @@ -234,19 +248,28 @@ impl MvccLazyCursor { impl CursorTrait for MvccLazyCursor { fn last(&mut self) -> Result> { - let last_rowid = self.db.get_last_rowid(self.table_id); - if let Some(last_rowid) = last_rowid { - self.current_pos.replace(CursorPosition::Loaded { - row_id: RowID { - table_id: self.table_id, - row_id: last_rowid, - }, - in_btree: false, - btree_consumed: false, - }); - } else { - self.current_pos.replace(CursorPosition::BeforeFirst); + if self.is_btree_allocated() { + return_if_io!(self.btree_cursor.last()); } + + self.invalidate_record(); + self.current_pos.replace(CursorPosition::End); + + let position_in_mvcc = self.db.get_last_rowid(self.table_id); + let position_in_btree = if self.is_btree_allocated() { + let IOResult::Done(maybe_rowid) = self.btree_cursor.rowid()? else { + panic!("BTree should have returned rowid after last"); + }; + maybe_rowid + } else { + None + }; + let new_position = self.get_new_position_from_mvcc_and_btree( + &position_in_mvcc, + &position_in_btree, + IterationDirection::Backwards, + ); + self.current_pos.replace(new_position); self.invalidate_record(); Ok(IOResult::Done(())) } @@ -362,6 +385,7 @@ impl CursorTrait for MvccLazyCursor { let new_position = self.get_new_position_from_mvcc_and_btree( &new_position_in_mvcc, ¤t_rowid_in_btree, + IterationDirection::Forwards, ); self.current_pos.replace(new_position); self.invalidate_record(); @@ -375,7 +399,117 @@ impl CursorTrait for MvccLazyCursor { } fn prev(&mut self) -> Result> { - todo!() + let current_state = *self.state.borrow(); + if current_state.is_none() { + let max_id = match *self.current_pos.borrow() { + CursorPosition::Loaded { + row_id, + in_btree: _, + btree_consumed: _, + } => row_id.row_id, + CursorPosition::BeforeFirst => { + return Ok(IOResult::Done(false)); + } + CursorPosition::End => { + i64::MAX // we need to find last row, so we look from the last id, + } + }; + + let new_position_in_mvcc = + match self + .db + .get_prev_row_id_for_table(self.table_id, max_id, self.tx_id) + { + Some(id) => CursorPosition::Loaded { + row_id: id, + in_btree: false, + btree_consumed: false, + }, + None => CursorPosition::BeforeFirst, + }; + self.state + .replace(Some(MvccLazyCursorState::Prev(PrevState::PrevBtree { + new_position_in_mvcc, + }))); + } + // Now we need to loop for prev rowid in btree that is valid. + // FIXME: this is quite unperformant, we should find a better way to do this. + loop { + let current_state = *self.state.borrow(); + let Some(MvccLazyCursorState::Prev(PrevState::PrevBtree { + new_position_in_mvcc, + })) = current_state + else { + panic!("Invalid state {:?}", self.state.borrow()); + }; + + // Check whether we have already consumed the rowid in btree. In BeforeFirst we can assume we haven't started calling next yet. + // In End we can assume we have already called next and it returned false, so we can assume we have consumed the rowid. + let btree_consumed = match self.get_current_pos() { + CursorPosition::Loaded { + row_id: _, + in_btree: _, + btree_consumed, + } => btree_consumed, + CursorPosition::BeforeFirst => true, + CursorPosition::End => true, + }; + + let found = if self.is_btree_allocated() { + // If we have a functional btree, let's either find next value, or use the one pointed at by the cursor. + if btree_consumed { + return_if_io!(self.btree_cursor.prev()) + } else { + true + } + } else { + // If we don't have a functional btree, we can't find next value, so we return false. + false + }; + // get current rowid in mvcc and in btree + let maybe_rowid_mvcc = match new_position_in_mvcc { + CursorPosition::Loaded { + row_id, + in_btree: _, + btree_consumed: _, + } => Some(row_id.row_id), + CursorPosition::BeforeFirst => None, + CursorPosition::End => None, + }; + let maybe_rowid_in_btree = if found { + let IOResult::Done(Some(rowid)) = self.btree_cursor.rowid()? else { + panic!("BTree should have returned rowid after prev because we found a valid rowid after calling btree_cursor.prev()"); + }; + if self.query_btree_version_is_valid(rowid) { + Some(rowid) + } else { + // if the row is not valid, we need to continue to the next rowid in btree. + // We first set consumed to true so that next time we call next, we don't use the same rowid. + if let CursorPosition::Loaded { btree_consumed, .. } = + &mut *self.current_pos.borrow_mut() + { + *btree_consumed = true; + } + continue; + } + } else { + None + }; + // Update based on direction. + let new_position = self.get_new_position_from_mvcc_and_btree( + &maybe_rowid_mvcc, + &maybe_rowid_in_btree, + IterationDirection::Backwards, + ); + self.current_pos.replace(new_position); + self.invalidate_record(); + self.state.replace(None); + + return Ok(IOResult::Done(matches!( + self.get_current_pos(), + CursorPosition::Loaded { .. } + ))); + } } fn rowid(&self) -> Result>> { @@ -574,12 +708,15 @@ impl CursorTrait for MvccLazyCursor { .map(|id| id.row_id); let IOResult::Done(maybe_rowid_in_btree) = self.btree_cursor.rowid()? else { - panic!("BTree should have returned rowid after next"); + panic!("BTree should have returned rowid after rewind because we called btree_cursor.rewind()"); }; let maybe_rowid_in_btree = maybe_rowid_in_btree.filter(|rowid| self.query_btree_version_is_valid(*rowid)); - let new_position = - self.get_new_position_from_mvcc_and_btree(&new_position_in_mvcc, &maybe_rowid_in_btree); + let new_position = self.get_new_position_from_mvcc_and_btree( + &new_position_in_mvcc, + &maybe_rowid_in_btree, + IterationDirection::Forwards, + ); self.current_pos.replace(new_position); Ok(IOResult::Done(())) } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 9ad9f335d..6095a3bbe 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -9,6 +9,7 @@ use crate::storage::btree::CursorTrait; use crate::storage::btree::CursorValidState; use crate::storage::sqlite3_ondisk::DatabaseHeader; use crate::storage::wal::TursoRwLock; +use crate::translate::plan::IterationDirection; use crate::turso_assert; use crate::types::IOCompletions; use crate::types::IOResult; @@ -1267,35 +1268,108 @@ impl MvStore { start: i64, tx_id: TxID, ) -> Option { - tracing::trace!( - "getting_next_id_for_table(table_id={}, range_start={})", + let res = self.get_row_id_for_table_in_direction( table_id, start, + tx_id, + IterationDirection::Forwards, ); - let min_bound = RowID { + tracing::trace!( + "get_next_row_id_for_table(table_id={}, start={}, tx_id={}, res={:?})", table_id, - row_id: start, - }; + start, + tx_id, + res + ); + res + } - let max_bound = RowID { + pub fn get_prev_row_id_for_table( + &self, + table_id: MVTableId, + start: i64, + tx_id: TxID, + ) -> Option { + let res = self.get_row_id_for_table_in_direction( table_id, - row_id: i64::MAX, - }; + start, + tx_id, + IterationDirection::Backwards, + ); + tracing::trace!( + "get_prev_row_id_for_table(table_id={}, start={}, tx_id={}, res={:?})", + table_id, + start, + tx_id, + res + ); + res + } + + pub fn get_row_id_for_table_in_direction( + &self, + table_id: MVTableId, + start: i64, + tx_id: TxID, + direction: IterationDirection, + ) -> Option { + tracing::trace!( + "getting_row_id_for_table_in_direction(table_id={}, range_start={}, direction={:?})", + table_id, + start, + direction + ); let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value(); - let mut rows = self.rows.range(min_bound..max_bound); - loop { - // We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need - // to loop either until we find a row that is not deleted or until we reach the end of the table. - let next_row = rows.next(); - let row = next_row?; + if direction == IterationDirection::Forwards { + let min_bound = RowID { + table_id, + row_id: start, + }; - // We found a row, let's check if it's visible to the transaction. - if let Some(visible_row) = self.find_last_visible_version(tx, row) { - return Some(visible_row); + let max_bound = RowID { + table_id, + row_id: i64::MAX, + }; + let mut rows = self.rows.range(min_bound..max_bound); + loop { + // We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need + // to loop either until we find a row that is not deleted or until we reach the end of the table. + let next_row = rows.next(); + tracing::trace!("next_row: {:?}", next_row); + let row = next_row?; + + // We found a row, let's check if it's visible to the transaction. + if let Some(visible_row) = self.find_last_visible_version(tx, row) { + return Some(visible_row); + } + // If this row is not visible, continue to the next row + } + } else { + let min_bound = RowID { + table_id, + row_id: i64::MIN, + }; + + let max_bound = RowID { + table_id, + row_id: start, + }; + // In backward's direction we iterate in reverse order. + let mut rows = self.rows.range(min_bound..max_bound).rev(); + loop { + // We are moving backwards, so if a row was deleted we just need to skip it. Therefore, we need + // to loop either until we find a row that is not deleted or until we reach the beginning of the table. + let next_row = rows.next(); + let row = next_row?; + + // We found a row, let's check if it's visible to the transaction. + if let Some(visible_row) = self.find_last_visible_version(tx, row) { + return Some(visible_row); + } + // If this row is not visible, continue to the next row } - // If this row is not visible, continue to the next row } } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index cbc6d2281..fbe4bab03 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1633,3 +1633,59 @@ fn test_cursor_with_btree_and_mvcc_2() { assert_eq!(rows[1], vec![Value::Integer(2)]); assert_eq!(rows[2], vec![Value::Integer(3)]); } + +#[test] +fn test_cursor_with_btree_and_mvcc_with_backward_cursor() { + let mut db = MvccTestDbNoConn::new_with_random_db(); + // First write some rows and checkpoint so data is flushed to BTree file (.db) + { + let conn = db.connect(); + conn.execute("CREATE TABLE t(x integer primary key)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1)").unwrap(); + conn.execute("INSERT INTO t VALUES (3)").unwrap(); + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + } + // Now restart so new connection will have to read data from BTree instead of MVCC. + db.restart(); + let conn = db.connect(); + // Insert a new row so that we have a gap in the BTree. + conn.execute("INSERT INTO t VALUES (2)").unwrap(); + let rows = get_rows(&conn, "SELECT * FROM t order by x desc"); + dbg!(&rows); + assert_eq!(rows.len(), 3); + assert_eq!(rows[0], vec![Value::Integer(3)]); + assert_eq!(rows[1], vec![Value::Integer(2)]); + assert_eq!(rows[2], vec![Value::Integer(1)]); +} + +#[test] +#[ignore = "we need to implement seek with btree cursor"] +fn test_cursor_with_btree_and_mvcc_with_backward_cursor_with_delete() { + let mut db = MvccTestDbNoConn::new_with_random_db(); + // First write some rows and checkpoint so data is flushed to BTree file (.db) + { + let conn = db.connect(); + conn.execute("CREATE TABLE t(x integer primary key)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1)").unwrap(); + conn.execute("INSERT INTO t VALUES (2)").unwrap(); + conn.execute("INSERT INTO t VALUES (4)").unwrap(); + conn.execute("INSERT INTO t VALUES (5)").unwrap(); + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + } + // Now restart so new connection will have to read data from BTree instead of MVCC. + db.restart(); + let conn = db.connect(); + // Insert a new row so that we have a gap in the BTree. + conn.execute("INSERT INTO t VALUES (3)").unwrap(); + conn.execute("DELETE FROM t WHERE x = 2").unwrap(); + println!("getting rows"); + let rows = get_rows(&conn, "SELECT * FROM t order by x desc"); + dbg!(&rows); + assert_eq!(rows.len(), 4); + assert_eq!(rows[0], vec![Value::Integer(5)]); + assert_eq!(rows[1], vec![Value::Integer(4)]); + assert_eq!(rows[2], vec![Value::Integer(3)]); + assert_eq!(rows[3], vec![Value::Integer(1)]); +}