diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 5fed0941f..098cb828e 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -54,3 +54,147 @@ impl ScanCursor { self.index >= self.row_ids.len() } } + +#[derive(Debug)] +pub struct LazyScanCursor { + pub db: Rc>, + pub current_pos: Option, + pub prev_pos: Option, + table_id: u64, + tx_id: u64, +} + +impl LazyScanCursor { + pub fn new(db: Rc>, tx_id: u64, table_id: u64) -> Result> { + let current_pos = db.get_next_row_id_for_table(table_id, 0); + Ok(Self { + db, + tx_id, + current_pos, + prev_pos: None, + table_id, + }) + } + + pub fn insert(&self, row: Row) -> Result<()> { + self.db.insert(self.tx_id, row) + } + + pub fn current_row_id(&self) -> Option { + self.current_pos + } + + pub fn current_row(&self) -> Result> { + if let Some(id) = self.current_pos { + self.db.read(self.tx_id, id) + } else { + Ok(None) + } + } + + pub fn close(self) -> Result<()> { + Ok(()) + } + + pub fn forward(&mut self) -> bool { + self.prev_pos = self.current_pos; + if let Some(row_id) = self.prev_pos { + let next_id = row_id.row_id + 1; + self.current_pos = self.db.get_next_row_id_for_table(self.table_id, next_id); + println!("{:?}", self.current_pos); + self.current_pos.is_some() + } else { + false + } + } + + pub fn is_empty(&self) -> bool { + self.current_pos.is_none() + } +} + +#[derive(Debug)] +pub struct BucketScanCursor { + pub db: Rc>, + pub bucket: Vec, + bucket_size: u64, + table_id: u64, + tx_id: u64, + index: usize, +} + +impl BucketScanCursor { + pub fn new( + db: Rc>, + tx_id: u64, + table_id: u64, + size: u64, + ) -> Result> { + let mut bucket = Vec::with_capacity(size as usize); + db.get_row_id_range(table_id, 0, &mut bucket, size)?; + Ok(Self { + db, + tx_id, + bucket, + bucket_size: size, + table_id, + index: 0, + }) + } + + pub fn insert(&self, row: Row) -> Result<()> { + self.db.insert(self.tx_id, row) + } + + pub fn current_row_id(&self) -> Option { + if self.index >= self.bucket.len() { + return None; + } + Some(self.bucket[self.index]) + } + + pub fn current_row(&self) -> Result> { + if self.index >= self.bucket.len() { + return Ok(None); + } + let id = self.bucket[self.index]; + self.db.read(self.tx_id, id) + } + + pub fn close(self) -> Result<()> { + Ok(()) + } + + pub fn forward(&mut self) -> bool { + self.index += 1; + if self.index < self.bucket.len() { + true + } else { + self.next_bucket().unwrap_or_default() + } + } + + pub fn is_empty(&self) -> bool { + self.index >= self.bucket.len() + } + + fn next_bucket(&mut self) -> Result { + let last_id = if !self.bucket.is_empty() { + Some(self.bucket[self.bucket.len() - 1].row_id + 1) + } else { + None + }; + + self.bucket.clear(); + + if let Some(next_id) = last_id { + self.db + .get_row_id_range(self.table_id, next_id, &mut self.bucket, self.bucket_size)?; + + self.index = 0; + Ok(!self.bucket.is_empty()) + } else { + Ok(false) + } + } +} diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index ea19b4386..78e2ed9ee 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -422,6 +422,58 @@ impl MvStore { .collect()) } + pub fn get_row_id_range( + &self, + table_id: u64, + start: u64, + bucket: &mut Vec, + max_items: u64, + ) -> Result<()> { + tracing::trace!( + "get_row_id_in_range(table_id={}, range_start={})", + table_id, + start, + ); + let start_id = RowID { + table_id, + row_id: start, + }; + + let end_id = RowID { + table_id, + row_id: u64::MAX, + }; + + self.rows + .range(start_id..end_id) + .take(max_items as usize) + .for_each(|entry| bucket.push(*entry.key())); + + Ok(()) + } + + pub fn get_next_row_id_for_table(&self, table_id: u64, start: u64) -> Option { + tracing::trace!( + "getting_next_id_for_table(table_id={}, range_start={})", + table_id, + start, + ); + let min_bound = RowID { + table_id, + row_id: start, + }; + + let max_bound = RowID { + table_id, + row_id: u64::MAX, + }; + + self.rows + .range(min_bound..max_bound) + .next() + .map(|entry| *entry.key()) + } + /// Begins a new transaction in the database. /// /// This function starts a new transaction in the database and returns a `TxID` value diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 7d99d5414..b1c93a7ac 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -617,6 +617,324 @@ fn test_future_row() { assert_eq!(row, None); } +use crate::mvcc::clock::LogicalClock; +use crate::mvcc::cursor::{BucketScanCursor, LazyScanCursor, ScanCursor}; +use crate::mvcc::database::{MvStore, Row, RowID}; +use crate::mvcc::persistent_storage::Storage; +use std::rc::Rc; +use std::sync::atomic::{AtomicU64, Ordering}; + +// Simple atomic clock implementation for testing +struct TestClock { + counter: AtomicU64, +} + +impl TestClock { + fn new(start: u64) -> Self { + Self { + counter: AtomicU64::new(start), + } + } +} + +impl LogicalClock for TestClock { + fn get_timestamp(&self) -> u64 { + self.counter.fetch_add(1, Ordering::SeqCst) + } + + fn reset(&self, ts: u64) { + let current = self.counter.load(Ordering::SeqCst); + if ts > current { + self.counter.store(ts, Ordering::SeqCst); + } + } +} + +fn setup_test_db() -> (Rc>, u64) { + let clock = TestClock::new(1); + let storage = Storage::new_noop(); + let db = Rc::new(MvStore::new(clock, storage)); + let tx_id = db.begin_tx(); + + let table_id = 1; + let test_rows = [ + (5, b"row5".to_vec()), + (10, b"row10".to_vec()), + (15, b"row15".to_vec()), + (20, b"row20".to_vec()), + (30, b"row30".to_vec()), + ]; + + for (row_id, data) in test_rows.iter() { + let id = RowID::new(table_id, *row_id); + let row = Row::new(id, data.clone()); + db.insert(tx_id, row).unwrap(); + } + + db.commit_tx(tx_id).unwrap(); + + let tx_id = db.begin_tx(); + (db, tx_id) +} + +fn setup_sequential_db() -> (Rc>, u64) { + let clock = TestClock::new(1); + let storage = Storage::new_noop(); + let db = Rc::new(MvStore::new(clock, storage)); + let tx_id = db.begin_tx(); + + let table_id = 1; + for i in 1..6 { + let id = RowID::new(table_id, i); + let data = format!("row{}", i).into_bytes(); + let row = Row::new(id, data); + db.insert(tx_id, row).unwrap(); + } + + db.commit_tx(tx_id).unwrap(); + + let tx_id = db.begin_tx(); + (db, tx_id) +} + +#[test] +fn test_lazy_scan_cursor_basic() { + let (db, tx_id) = setup_sequential_db(); + let table_id = 1; + + let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + + // Check first row + assert!(!cursor.is_empty()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 1); + + // Iterate through all rows + let mut count = 1; + while cursor.forward() { + count += 1; + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, count); + } + + // Should have found 5 rows + assert_eq!(count, 5); + + // After the last row, is_empty should return true + assert!(cursor.forward() == false); + assert!(cursor.is_empty()); +} + +#[test] +fn test_lazy_scan_cursor_with_gaps() { + let (db, tx_id) = setup_test_db(); + let table_id = 1; + + let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + + // Check first row + assert!(!cursor.is_empty()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 5); + + // Test moving forward and checking IDs + let expected_ids = [5, 10, 15, 20, 30]; + let mut index = 0; + + assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]); + + while cursor.forward() { + index += 1; + if index < expected_ids.len() { + assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]); + } + } + + // Should have found all 5 rows + assert_eq!(index, expected_ids.len() - 1); +} + +#[test] +fn test_bucket_scan_cursor_basic() { + let (db, tx_id) = setup_sequential_db(); + let table_id = 1; + + // Create a bucket size that's smaller than the total rows + let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 3).unwrap(); + + // Check first row + assert!(!cursor.is_empty()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 1); + + // Iterate through all rows + let mut count = 1; + let mut row_ids = Vec::new(); + row_ids.push(row.id.row_id); + + while cursor.forward() { + count += 1; + let row = cursor.current_row().unwrap().unwrap(); + row_ids.push(row.id.row_id); + } + + // Should have found 5 rows + assert_eq!(count, 5); + assert_eq!(row_ids, vec![1, 2, 3, 4, 5]); + + // After the last row, is_empty should return true + assert!(cursor.is_empty()); +} + +#[test] +fn test_bucket_scan_cursor_with_gaps() { + let (db, tx_id) = setup_test_db(); + let table_id = 1; + + // Create a bucket size of 2 to force multiple bucket loads + let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 2).unwrap(); + + // Check first row + assert!(!cursor.is_empty()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 5); + + // Test moving forward and checking IDs + let expected_ids = [5, 10, 15, 20, 30]; + let mut row_ids = Vec::new(); + row_ids.push(row.id.row_id); + + while cursor.forward() { + let row = cursor.current_row().unwrap().unwrap(); + row_ids.push(row.id.row_id); + } + + // Should have all expected IDs + assert_eq!(row_ids, expected_ids); + + // After the last row, is_empty should return true + assert!(cursor.is_empty()); +} + +#[test] +fn test_scan_cursor_basic() { + let (db, tx_id) = setup_sequential_db(); + let table_id = 1; + + let mut cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + + // Check first row + assert!(!cursor.is_empty()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 1); + + // Iterate through all rows + let mut count = 1; + while cursor.forward() { + count += 1; + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, count); + } + + // Should have found 5 rows + assert_eq!(count, 5); + + // After the last row, is_empty should return true + assert!(cursor.forward() == false); + assert!(cursor.is_empty()); +} + +#[test] +fn test_cursor_with_empty_table() { + let clock = TestClock::new(1); + let storage = Storage::new_noop(); + let db = Rc::new(MvStore::new(clock, storage)); + let tx_id = db.begin_tx(); + let table_id = 1; // Empty table + + // Test LazyScanCursor with empty table + let cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + assert!(cursor.is_empty()); + assert!(cursor.current_row_id().is_none()); + + // Test BucketScanCursor with empty table + let cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 10).unwrap(); + assert!(cursor.is_empty()); + assert!(cursor.current_row_id().is_none()); + + // Test ScanCursor with empty table + let cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + assert!(cursor.is_empty()); + assert!(cursor.current_row_id().is_none()); +} + +#[test] +fn test_cursor_modification_during_scan() { + let (db, tx_id) = setup_sequential_db(); + let table_id = 1; + + let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap(); + + // Read first row + let first_row = cursor.current_row().unwrap().unwrap(); + assert_eq!(first_row.id.row_id, 1); + + // Insert a new row with ID between existing rows + let new_row_id = RowID::new(table_id, 3); + let new_row_data = b"new_row".to_vec(); + let new_row = Row::new(new_row_id, new_row_data); + + cursor.insert(new_row).unwrap(); + + // Continue scanning - the cursor should still work correctly + cursor.forward(); // Move to 2 + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 2); + + cursor.forward(); // Move to 3 (our new row) + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 3); + assert_eq!(row.data, b"new_row".to_vec()); + + cursor.forward(); // Move to 4 + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 4); +} + +#[test] +fn test_bucket_scan_cursor_next_bucket() { + let (db, tx_id) = setup_test_db(); + let table_id = 1; + + // Create a bucket size of 1 to force bucket loading for each row + let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 1).unwrap(); + + // Get the first row + assert!(!cursor.is_empty()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 5); + + // Move to the next row - this should trigger next_bucket() + assert!(cursor.forward()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 10); + + // Move to the next row again + assert!(cursor.forward()); + let row = cursor.current_row().unwrap().unwrap(); + assert_eq!(row.id.row_id, 15); + + // Continue to the end + assert!(cursor.forward()); + assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 20); + + assert!(cursor.forward()); + assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 30); + + // Should be no more rows + assert!(!cursor.forward()); + assert!(cursor.is_empty()); +} /* States described in the Hekaton paper *for serializability*: Table 1: Case analysis of action to take when version V’s