diff --git a/core/lib.rs b/core/lib.rs index e3cdea746..232e732d1 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -108,7 +108,7 @@ enum TransactionState { pub(crate) type MvStore = mvcc::MvStore; -pub(crate) type MvCursor = mvcc::cursor::ScanCursor; +pub(crate) type MvCursor = mvcc::cursor::MvccLazyCursor; /// The database manager ensures that there is a single, shared /// `Database` object per a database file. We need because it is not safe diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 86187e0a2..db0d621a7 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -1,210 +1,103 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::database::{MvStore, Result, Row, RowID}; -use crate::turso_assert; use std::fmt::Debug; use std::rc::Rc; +#[derive(Debug, Copy, Clone)] +enum CursorPosition { + /// We haven't loaded any row yet. + BeforeFirst, + /// We have loaded a row. + Loaded(RowID), + /// We have reached the end of the table. + End, +} #[derive(Debug)] -pub struct ScanCursor { +pub struct MvccLazyCursor { pub db: Rc>, - pub row_ids: Vec, - pub index: usize, + current_pos: CursorPosition, + table_id: u64, tx_id: u64, } -impl ScanCursor { - pub fn new(db: Rc>, tx_id: u64, table_id: u64) -> Result> { - let row_ids = db.scan_row_ids_for_table(table_id)?; +impl MvccLazyCursor { + pub fn new(db: Rc>, tx_id: u64, table_id: u64) -> Result> { Ok(Self { db, tx_id, - row_ids, - index: 0, + current_pos: CursorPosition::BeforeFirst, + table_id, }) } + /// Insert a row into the table. + /// Sets the cursor to the inserted row. + pub fn insert(&mut self, row: Row) -> Result<()> { + self.current_pos = CursorPosition::Loaded(row.id); + self.db.insert(self.tx_id, row).inspect_err(|_| { + self.current_pos = CursorPosition::BeforeFirst; + })?; + Ok(()) + } + + pub fn current_row_id(&self) -> Option { + match self.current_pos { + CursorPosition::Loaded(id) => Some(id), + CursorPosition::BeforeFirst => None, + CursorPosition::End => None, + } + } + + pub fn current_row(&self) -> Result> { + match self.current_pos { + CursorPosition::Loaded(id) => self.db.read(self.tx_id, id), + CursorPosition::BeforeFirst => Ok(None), + CursorPosition::End => Ok(None), + } + } + + pub fn close(self) -> Result<()> { + Ok(()) + } + + /// Move the cursor to the next row. Returns true if the cursor moved to the next row, false if the cursor is at the end of the table. + pub fn forward(&mut self) -> bool { + let before_first = matches!(self.current_pos, CursorPosition::BeforeFirst); + let min_id = match self.current_pos { + CursorPosition::Loaded(id) => id.row_id + 1, + CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id + CursorPosition::End => { + // let's keep same state, we reached the end so no point in moving forward. + return false; + } + }; + self.current_pos = match self.db.get_next_row_id_for_table(self.table_id, min_id) { + Some(id) => CursorPosition::Loaded(id), + None => { + if before_first { + // if it wasn't loaded and we didn't find anything, it means the table is empty. + CursorPosition::BeforeFirst + } else { + // if we had something loaded, and we didn't find next key then it means we are at the end. + CursorPosition::End + } + } + }; + matches!(self.current_pos, CursorPosition::Loaded(_)) + } + + /// Returns true if the is not pointing to any row. + pub fn is_empty(&self) -> bool { + // If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table. + // If we have loaded a row, it means there is something in the table. + match self.current_pos { + CursorPosition::Loaded(_) => false, + CursorPosition::BeforeFirst => true, + CursorPosition::End => true, + } + } + pub fn rewind(&mut self) { - self.index = 0; - } - - pub fn insert(&self, row: Row) -> Result<()> { - self.db.insert(self.tx_id, row) - } - - pub fn current_row_id(&self) -> Option { - turso_assert!(self.index > 0, "index must be greater than zero"); - let idx = self.index - 1; - if idx >= self.row_ids.len() { - return None; - } - Some(self.row_ids[idx]) - } - - pub fn current_row(&self) -> Result> { - turso_assert!(self.index > 0, "index must be greater than zero"); - let idx = self.index - 1; - if idx >= self.row_ids.len() { - return Ok(None); - } - let id = self.row_ids[idx]; - self.db.read(self.tx_id, id) - } - - pub fn close(self) -> Result<()> { - Ok(()) - } - - pub fn forward(&mut self) -> bool { - self.index += 1; - let idx = self.index - 1; - idx < self.row_ids.len() - } - - pub fn is_empty(&self) -> bool { - self.index >= self.row_ids.len() - } -} - -#[derive(Debug)] -pub struct LazyScanCursor { - pub db: Rc>, - pub current_pos: Option, - pub prev_pos: Option, - table_id: u64, - tx_id: u64, -} - -impl LazyScanCursor { - pub fn new(db: Rc>, tx_id: u64, table_id: u64) -> Result> { - let current_pos = db.get_next_row_id_for_table(table_id, 0); - Ok(Self { - db, - tx_id, - current_pos, - prev_pos: None, - table_id, - }) - } - - pub fn insert(&self, row: Row) -> Result<()> { - self.db.insert(self.tx_id, row) - } - - pub fn current_row_id(&self) -> Option { - self.current_pos - } - - pub fn current_row(&self) -> Result> { - if let Some(id) = self.current_pos { - self.db.read(self.tx_id, id) - } else { - Ok(None) - } - } - - pub fn close(self) -> Result<()> { - Ok(()) - } - - pub fn forward(&mut self) -> bool { - self.prev_pos = self.current_pos; - if let Some(row_id) = self.prev_pos { - let next_id = row_id.row_id + 1; - self.current_pos = self.db.get_next_row_id_for_table(self.table_id, next_id); - println!("{:?}", self.current_pos); - self.current_pos.is_some() - } else { - false - } - } - - pub fn is_empty(&self) -> bool { - self.current_pos.is_none() - } -} - -#[derive(Debug)] -pub struct BucketScanCursor { - pub db: Rc>, - pub bucket: Vec, - bucket_size: u64, - table_id: u64, - tx_id: u64, - index: usize, -} - -impl BucketScanCursor { - pub fn new( - db: Rc>, - tx_id: u64, - table_id: u64, - size: u64, - ) -> Result> { - let mut bucket = Vec::with_capacity(size as usize); - db.get_row_id_range(table_id, 0, &mut bucket, size)?; - Ok(Self { - db, - tx_id, - bucket, - bucket_size: size, - table_id, - index: 0, - }) - } - - pub fn insert(&self, row: Row) -> Result<()> { - self.db.insert(self.tx_id, row) - } - - pub fn current_row_id(&self) -> Option { - if self.index >= self.bucket.len() { - return None; - } - Some(self.bucket[self.index]) - } - - pub fn current_row(&self) -> Result> { - if self.index >= self.bucket.len() { - return Ok(None); - } - let id = self.bucket[self.index]; - self.db.read(self.tx_id, id) - } - - pub fn close(self) -> Result<()> { - Ok(()) - } - - pub fn forward(&mut self) -> bool { - self.index += 1; - if self.index < self.bucket.len() { - true - } else { - self.next_bucket().unwrap_or_default() - } - } - - pub fn is_empty(&self) -> bool { - self.index >= self.bucket.len() - } - - fn next_bucket(&mut self) -> Result { - let last_id = if !self.bucket.is_empty() { - Some(self.bucket[self.bucket.len() - 1].row_id + 1) - } else { - None - }; - - self.bucket.clear(); - - if let Some(next_id) = last_id { - self.db - .get_row_id_range(self.table_id, next_id, &mut self.bucket, self.bucket_size)?; - - self.index = 0; - Ok(!self.bucket.is_empty()) - } else { - Ok(false) - } + self.current_pos = CursorPosition::BeforeFirst; } } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 08acd9634..d4949d3af 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -618,7 +618,7 @@ fn test_future_row() { } use crate::mvcc::clock::LogicalClock; -use crate::mvcc::cursor::{BucketScanCursor, LazyScanCursor, ScanCursor}; +use crate::mvcc::cursor::MvccLazyCursor; use crate::mvcc::database::{MvStore, Row, RowID}; use crate::mvcc::persistent_storage::Storage; use std::rc::Rc; @@ -677,15 +677,15 @@ fn setup_test_db() -> (Rc>, u64) { (db, tx_id) } -fn setup_sequential_db() -> (Rc>, u64) { +fn setup_lazy_db(initial_keys: &[i64]) -> (Rc>, u64) { let clock = TestClock::new(1); let storage = Storage::new_noop(); let db = Rc::new(MvStore::new(clock, storage)); let tx_id = db.begin_tx(); let table_id = 1; - for i in 1..6 { - let id = RowID::new(table_id, i); + for i in initial_keys { + let id = RowID::new(table_id, *i); let data = format!("row{i}").into_bytes(); let row = Row::new(id, data); db.insert(tx_id, row).unwrap(); @@ -699,12 +699,13 @@ fn setup_sequential_db() -> (Rc>, u64) { #[test] fn test_lazy_scan_cursor_basic() { - let (db, tx_id) = setup_sequential_db(); + let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]); let table_id = 1; - let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap(); // Check first row + assert!(cursor.forward()); assert!(!cursor.is_empty()); let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 1); @@ -730,9 +731,10 @@ fn test_lazy_scan_cursor_with_gaps() { let (db, tx_id) = setup_test_db(); let table_id = 1; - let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap(); // Check first row + assert!(cursor.forward()); assert!(!cursor.is_empty()); let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 5); @@ -755,73 +757,11 @@ fn test_lazy_scan_cursor_with_gaps() { } #[test] -fn test_bucket_scan_cursor_basic() { - let (db, tx_id) = setup_sequential_db(); +fn test_cursor_basic() { + let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]); let table_id = 1; - // Create a bucket size that's smaller than the total rows - let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 3).unwrap(); - - // Check first row - assert!(!cursor.is_empty()); - let row = cursor.current_row().unwrap().unwrap(); - assert_eq!(row.id.row_id, 1); - - // Iterate through all rows - let mut count = 1; - let mut row_ids = Vec::new(); - row_ids.push(row.id.row_id); - - while cursor.forward() { - count += 1; - let row = cursor.current_row().unwrap().unwrap(); - row_ids.push(row.id.row_id); - } - - // Should have found 5 rows - assert_eq!(count, 5); - assert_eq!(row_ids, vec![1, 2, 3, 4, 5]); - - // After the last row, is_empty should return true - assert!(cursor.is_empty()); -} - -#[test] -fn test_bucket_scan_cursor_with_gaps() { - let (db, tx_id) = setup_test_db(); - let table_id = 1; - - // Create a bucket size of 2 to force multiple bucket loads - let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 2).unwrap(); - - // Check first row - assert!(!cursor.is_empty()); - let row = cursor.current_row().unwrap().unwrap(); - assert_eq!(row.id.row_id, 5); - - // Test moving forward and checking IDs - let expected_ids = [5, 10, 15, 20, 30]; - let mut row_ids = Vec::new(); - row_ids.push(row.id.row_id); - - while cursor.forward() { - let row = cursor.current_row().unwrap().unwrap(); - row_ids.push(row.id.row_id); - } - - // Should have all expected IDs - assert_eq!(row_ids, expected_ids); - - // After the last row, is_empty should return true - assert!(cursor.is_empty()); -} - -#[test] -fn test_scan_cursor_basic() { - let (db, tx_id) = setup_sequential_db(); - let table_id = 1; - - let mut cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap(); cursor.forward(); @@ -855,30 +795,20 @@ fn test_cursor_with_empty_table() { let table_id = 1; // Empty table // Test LazyScanCursor with empty table - let cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); - assert!(cursor.is_empty()); - assert!(cursor.current_row_id().is_none()); - - // Test BucketScanCursor with empty table - let cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 10).unwrap(); - assert!(cursor.is_empty()); - assert!(cursor.current_row_id().is_none()); - - // Test ScanCursor with empty table - let mut cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap(); - cursor.forward(); + let cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap(); assert!(cursor.is_empty()); assert!(cursor.current_row_id().is_none()); } #[test] fn test_cursor_modification_during_scan() { - let (db, tx_id) = setup_sequential_db(); + let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]); let table_id = 1; - let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap(); // Read first row + assert!(cursor.forward()); let first_row = cursor.current_row().unwrap().unwrap(); assert_eq!(first_row.id.row_id, 1); @@ -888,56 +818,22 @@ fn test_cursor_modification_during_scan() { let new_row = Row::new(new_row_id, new_row_data); cursor.insert(new_row).unwrap(); - - // Continue scanning - the cursor should still work correctly - cursor.forward(); // Move to 2 - let row = cursor.current_row().unwrap().unwrap(); - assert_eq!(row.id.row_id, 2); - - cursor.forward(); // Move to 3 (our new row) let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 3); assert_eq!(row.data, b"new_row".to_vec()); + // Continue scanning - the cursor should still work correctly cursor.forward(); // Move to 4 let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 4); -} -#[test] -fn test_bucket_scan_cursor_next_bucket() { - let (db, tx_id) = setup_test_db(); - let table_id = 1; - - // Create a bucket size of 1 to force bucket loading for each row - let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 1).unwrap(); - - // Get the first row - assert!(!cursor.is_empty()); + cursor.forward(); // Move to 5 (our new row) let row = cursor.current_row().unwrap().unwrap(); assert_eq!(row.id.row_id, 5); - - // Move to the next row - this should trigger next_bucket() - assert!(cursor.forward()); - let row = cursor.current_row().unwrap().unwrap(); - assert_eq!(row.id.row_id, 10); - - // Move to the next row again - assert!(cursor.forward()); - let row = cursor.current_row().unwrap().unwrap(); - assert_eq!(row.id.row_id, 15); - - // Continue to the end - assert!(cursor.forward()); - assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 20); - - assert!(cursor.forward()); - assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 30); - - // Should be no more rows assert!(!cursor.forward()); assert!(cursor.is_empty()); } + /* States described in the Hekaton paper *for serializability*: Table 1: Case analysis of action to take when version V’s