Merge 'core/mvcc: simplify mvcc cursor types' from Pere Diaz Bou

We have so many cursor types that it will be unbearable to properly make
all of them work. Let's simplify this and only focus on lazy cursor
which in the future will load from database in case we need it.

Closes #2333
This commit is contained in:
Pekka Enberg
2025-07-30 09:10:44 +03:00
3 changed files with 104 additions and 315 deletions

View File

@@ -108,7 +108,7 @@ enum TransactionState {
pub(crate) type MvStore = mvcc::MvStore<mvcc::LocalClock>;
pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
pub(crate) type MvCursor = mvcc::cursor::MvccLazyCursor<mvcc::LocalClock>;
/// The database manager ensures that there is a single, shared
/// `Database` object per a database file. We need because it is not safe

View File

@@ -1,210 +1,103 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::database::{MvStore, Result, Row, RowID};
use crate::turso_assert;
use std::fmt::Debug;
use std::rc::Rc;
#[derive(Debug, Copy, Clone)]
enum CursorPosition {
/// We haven't loaded any row yet.
BeforeFirst,
/// We have loaded a row.
Loaded(RowID),
/// We have reached the end of the table.
End,
}
#[derive(Debug)]
pub struct ScanCursor<Clock: LogicalClock> {
pub struct MvccLazyCursor<Clock: LogicalClock> {
pub db: Rc<MvStore<Clock>>,
pub row_ids: Vec<RowID>,
pub index: usize,
current_pos: CursorPosition,
table_id: u64,
tx_id: u64,
}
impl<Clock: LogicalClock> ScanCursor<Clock> {
pub fn new(db: Rc<MvStore<Clock>>, tx_id: u64, table_id: u64) -> Result<ScanCursor<Clock>> {
let row_ids = db.scan_row_ids_for_table(table_id)?;
impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
pub fn new(db: Rc<MvStore<Clock>>, tx_id: u64, table_id: u64) -> Result<MvccLazyCursor<Clock>> {
Ok(Self {
db,
tx_id,
row_ids,
index: 0,
current_pos: CursorPosition::BeforeFirst,
table_id,
})
}
/// Insert a row into the table.
/// Sets the cursor to the inserted row.
pub fn insert(&mut self, row: Row) -> Result<()> {
self.current_pos = CursorPosition::Loaded(row.id);
self.db.insert(self.tx_id, row).inspect_err(|_| {
self.current_pos = CursorPosition::BeforeFirst;
})?;
Ok(())
}
pub fn current_row_id(&self) -> Option<RowID> {
match self.current_pos {
CursorPosition::Loaded(id) => Some(id),
CursorPosition::BeforeFirst => None,
CursorPosition::End => None,
}
}
pub fn current_row(&self) -> Result<Option<Row>> {
match self.current_pos {
CursorPosition::Loaded(id) => self.db.read(self.tx_id, id),
CursorPosition::BeforeFirst => Ok(None),
CursorPosition::End => Ok(None),
}
}
pub fn close(self) -> Result<()> {
Ok(())
}
/// Move the cursor to the next row. Returns true if the cursor moved to the next row, false if the cursor is at the end of the table.
pub fn forward(&mut self) -> bool {
let before_first = matches!(self.current_pos, CursorPosition::BeforeFirst);
let min_id = match self.current_pos {
CursorPosition::Loaded(id) => id.row_id + 1,
CursorPosition::BeforeFirst => i64::MIN, // we need to find first row, so we look from the first id
CursorPosition::End => {
// let's keep same state, we reached the end so no point in moving forward.
return false;
}
};
self.current_pos = match self.db.get_next_row_id_for_table(self.table_id, min_id) {
Some(id) => CursorPosition::Loaded(id),
None => {
if before_first {
// if it wasn't loaded and we didn't find anything, it means the table is empty.
CursorPosition::BeforeFirst
} else {
// if we had something loaded, and we didn't find next key then it means we are at the end.
CursorPosition::End
}
}
};
matches!(self.current_pos, CursorPosition::Loaded(_))
}
/// Returns true if the is not pointing to any row.
pub fn is_empty(&self) -> bool {
// If we reached the end of the table, it means we traversed the whole table therefore there must be something in the table.
// If we have loaded a row, it means there is something in the table.
match self.current_pos {
CursorPosition::Loaded(_) => false,
CursorPosition::BeforeFirst => true,
CursorPosition::End => true,
}
}
pub fn rewind(&mut self) {
self.index = 0;
}
pub fn insert(&self, row: Row) -> Result<()> {
self.db.insert(self.tx_id, row)
}
pub fn current_row_id(&self) -> Option<RowID> {
turso_assert!(self.index > 0, "index must be greater than zero");
let idx = self.index - 1;
if idx >= self.row_ids.len() {
return None;
}
Some(self.row_ids[idx])
}
pub fn current_row(&self) -> Result<Option<Row>> {
turso_assert!(self.index > 0, "index must be greater than zero");
let idx = self.index - 1;
if idx >= self.row_ids.len() {
return Ok(None);
}
let id = self.row_ids[idx];
self.db.read(self.tx_id, id)
}
pub fn close(self) -> Result<()> {
Ok(())
}
pub fn forward(&mut self) -> bool {
self.index += 1;
let idx = self.index - 1;
idx < self.row_ids.len()
}
pub fn is_empty(&self) -> bool {
self.index >= self.row_ids.len()
}
}
#[derive(Debug)]
pub struct LazyScanCursor<Clock: LogicalClock> {
pub db: Rc<MvStore<Clock>>,
pub current_pos: Option<RowID>,
pub prev_pos: Option<RowID>,
table_id: u64,
tx_id: u64,
}
impl<Clock: LogicalClock> LazyScanCursor<Clock> {
pub fn new(db: Rc<MvStore<Clock>>, tx_id: u64, table_id: u64) -> Result<LazyScanCursor<Clock>> {
let current_pos = db.get_next_row_id_for_table(table_id, 0);
Ok(Self {
db,
tx_id,
current_pos,
prev_pos: None,
table_id,
})
}
pub fn insert(&self, row: Row) -> Result<()> {
self.db.insert(self.tx_id, row)
}
pub fn current_row_id(&self) -> Option<RowID> {
self.current_pos
}
pub fn current_row(&self) -> Result<Option<Row>> {
if let Some(id) = self.current_pos {
self.db.read(self.tx_id, id)
} else {
Ok(None)
}
}
pub fn close(self) -> Result<()> {
Ok(())
}
pub fn forward(&mut self) -> bool {
self.prev_pos = self.current_pos;
if let Some(row_id) = self.prev_pos {
let next_id = row_id.row_id + 1;
self.current_pos = self.db.get_next_row_id_for_table(self.table_id, next_id);
println!("{:?}", self.current_pos);
self.current_pos.is_some()
} else {
false
}
}
pub fn is_empty(&self) -> bool {
self.current_pos.is_none()
}
}
#[derive(Debug)]
pub struct BucketScanCursor<Clock: LogicalClock> {
pub db: Rc<MvStore<Clock>>,
pub bucket: Vec<RowID>,
bucket_size: u64,
table_id: u64,
tx_id: u64,
index: usize,
}
impl<Clock: LogicalClock> BucketScanCursor<Clock> {
pub fn new(
db: Rc<MvStore<Clock>>,
tx_id: u64,
table_id: u64,
size: u64,
) -> Result<BucketScanCursor<Clock>> {
let mut bucket = Vec::with_capacity(size as usize);
db.get_row_id_range(table_id, 0, &mut bucket, size)?;
Ok(Self {
db,
tx_id,
bucket,
bucket_size: size,
table_id,
index: 0,
})
}
pub fn insert(&self, row: Row) -> Result<()> {
self.db.insert(self.tx_id, row)
}
pub fn current_row_id(&self) -> Option<RowID> {
if self.index >= self.bucket.len() {
return None;
}
Some(self.bucket[self.index])
}
pub fn current_row(&self) -> Result<Option<Row>> {
if self.index >= self.bucket.len() {
return Ok(None);
}
let id = self.bucket[self.index];
self.db.read(self.tx_id, id)
}
pub fn close(self) -> Result<()> {
Ok(())
}
pub fn forward(&mut self) -> bool {
self.index += 1;
if self.index < self.bucket.len() {
true
} else {
self.next_bucket().unwrap_or_default()
}
}
pub fn is_empty(&self) -> bool {
self.index >= self.bucket.len()
}
fn next_bucket(&mut self) -> Result<bool> {
let last_id = if !self.bucket.is_empty() {
Some(self.bucket[self.bucket.len() - 1].row_id + 1)
} else {
None
};
self.bucket.clear();
if let Some(next_id) = last_id {
self.db
.get_row_id_range(self.table_id, next_id, &mut self.bucket, self.bucket_size)?;
self.index = 0;
Ok(!self.bucket.is_empty())
} else {
Ok(false)
}
self.current_pos = CursorPosition::BeforeFirst;
}
}

View File

@@ -618,7 +618,7 @@ fn test_future_row() {
}
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::cursor::{BucketScanCursor, LazyScanCursor, ScanCursor};
use crate::mvcc::cursor::MvccLazyCursor;
use crate::mvcc::database::{MvStore, Row, RowID};
use crate::mvcc::persistent_storage::Storage;
use std::rc::Rc;
@@ -677,15 +677,15 @@ fn setup_test_db() -> (Rc<MvStore<TestClock>>, u64) {
(db, tx_id)
}
fn setup_sequential_db() -> (Rc<MvStore<TestClock>>, u64) {
fn setup_lazy_db(initial_keys: &[i64]) -> (Rc<MvStore<TestClock>>, u64) {
let clock = TestClock::new(1);
let storage = Storage::new_noop();
let db = Rc::new(MvStore::new(clock, storage));
let tx_id = db.begin_tx();
let table_id = 1;
for i in 1..6 {
let id = RowID::new(table_id, i);
for i in initial_keys {
let id = RowID::new(table_id, *i);
let data = format!("row{i}").into_bytes();
let row = Row::new(id, data);
db.insert(tx_id, row).unwrap();
@@ -699,12 +699,13 @@ fn setup_sequential_db() -> (Rc<MvStore<TestClock>>, u64) {
#[test]
fn test_lazy_scan_cursor_basic() {
let (db, tx_id) = setup_sequential_db();
let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]);
let table_id = 1;
let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap();
// Check first row
assert!(cursor.forward());
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 1);
@@ -730,9 +731,10 @@ fn test_lazy_scan_cursor_with_gaps() {
let (db, tx_id) = setup_test_db();
let table_id = 1;
let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap();
// Check first row
assert!(cursor.forward());
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 5);
@@ -755,73 +757,11 @@ fn test_lazy_scan_cursor_with_gaps() {
}
#[test]
fn test_bucket_scan_cursor_basic() {
let (db, tx_id) = setup_sequential_db();
fn test_cursor_basic() {
let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]);
let table_id = 1;
// Create a bucket size that's smaller than the total rows
let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 3).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 1);
// Iterate through all rows
let mut count = 1;
let mut row_ids = Vec::new();
row_ids.push(row.id.row_id);
while cursor.forward() {
count += 1;
let row = cursor.current_row().unwrap().unwrap();
row_ids.push(row.id.row_id);
}
// Should have found 5 rows
assert_eq!(count, 5);
assert_eq!(row_ids, vec![1, 2, 3, 4, 5]);
// After the last row, is_empty should return true
assert!(cursor.is_empty());
}
#[test]
fn test_bucket_scan_cursor_with_gaps() {
let (db, tx_id) = setup_test_db();
let table_id = 1;
// Create a bucket size of 2 to force multiple bucket loads
let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 2).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 5);
// Test moving forward and checking IDs
let expected_ids = [5, 10, 15, 20, 30];
let mut row_ids = Vec::new();
row_ids.push(row.id.row_id);
while cursor.forward() {
let row = cursor.current_row().unwrap().unwrap();
row_ids.push(row.id.row_id);
}
// Should have all expected IDs
assert_eq!(row_ids, expected_ids);
// After the last row, is_empty should return true
assert!(cursor.is_empty());
}
#[test]
fn test_scan_cursor_basic() {
let (db, tx_id) = setup_sequential_db();
let table_id = 1;
let mut cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap();
let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap();
cursor.forward();
@@ -855,30 +795,20 @@ fn test_cursor_with_empty_table() {
let table_id = 1; // Empty table
// Test LazyScanCursor with empty table
let cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
assert!(cursor.is_empty());
assert!(cursor.current_row_id().is_none());
// Test BucketScanCursor with empty table
let cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 10).unwrap();
assert!(cursor.is_empty());
assert!(cursor.current_row_id().is_none());
// Test ScanCursor with empty table
let mut cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap();
cursor.forward();
let cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap();
assert!(cursor.is_empty());
assert!(cursor.current_row_id().is_none());
}
#[test]
fn test_cursor_modification_during_scan() {
let (db, tx_id) = setup_sequential_db();
let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]);
let table_id = 1;
let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
let mut cursor = MvccLazyCursor::new(db.clone(), tx_id, table_id).unwrap();
// Read first row
assert!(cursor.forward());
let first_row = cursor.current_row().unwrap().unwrap();
assert_eq!(first_row.id.row_id, 1);
@@ -888,56 +818,22 @@ fn test_cursor_modification_during_scan() {
let new_row = Row::new(new_row_id, new_row_data);
cursor.insert(new_row).unwrap();
// Continue scanning - the cursor should still work correctly
cursor.forward(); // Move to 2
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 2);
cursor.forward(); // Move to 3 (our new row)
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 3);
assert_eq!(row.data, b"new_row".to_vec());
// Continue scanning - the cursor should still work correctly
cursor.forward(); // Move to 4
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 4);
}
#[test]
fn test_bucket_scan_cursor_next_bucket() {
let (db, tx_id) = setup_test_db();
let table_id = 1;
// Create a bucket size of 1 to force bucket loading for each row
let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 1).unwrap();
// Get the first row
assert!(!cursor.is_empty());
cursor.forward(); // Move to 5 (our new row)
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 5);
// Move to the next row - this should trigger next_bucket()
assert!(cursor.forward());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 10);
// Move to the next row again
assert!(cursor.forward());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 15);
// Continue to the end
assert!(cursor.forward());
assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 20);
assert!(cursor.forward());
assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 30);
// Should be no more rows
assert!(!cursor.forward());
assert!(cursor.is_empty());
}
/* States described in the Hekaton paper *for serializability*:
Table 1: Case analysis of action to take when version Vs