Merge 'Reduce MVCC cursor memory consumption' from Ihor Andrianov

Fix memory hogging in MVCC scan cursor #1104
The current scan cursor loads all rowids at once, which blows up memory
on big tables.
Added BucketScanCursor that loads rowids in configurable batches -
better memory usage while keeping decent performance. It's a drop-in
replacement with the same interface.
Also included LazyScanCursor as an alternative that fetches one rowid at
a time, though it's less efficient due to log(n) skipmap lookups for
each row.
BucketScanCursor is the recommended approach for most use cases. WDYT?

Closes #1178
This commit is contained in:
Pekka Enberg
2025-03-27 08:56:35 +02:00
3 changed files with 514 additions and 0 deletions

View File

@@ -54,3 +54,147 @@ impl<Clock: LogicalClock> ScanCursor<Clock> {
self.index >= self.row_ids.len()
}
}
#[derive(Debug)]
pub struct LazyScanCursor<Clock: LogicalClock> {
pub db: Rc<MvStore<Clock>>,
pub current_pos: Option<RowID>,
pub prev_pos: Option<RowID>,
table_id: u64,
tx_id: u64,
}
impl<Clock: LogicalClock> LazyScanCursor<Clock> {
pub fn new(db: Rc<MvStore<Clock>>, tx_id: u64, table_id: u64) -> Result<LazyScanCursor<Clock>> {
let current_pos = db.get_next_row_id_for_table(table_id, 0);
Ok(Self {
db,
tx_id,
current_pos,
prev_pos: None,
table_id,
})
}
pub fn insert(&self, row: Row) -> Result<()> {
self.db.insert(self.tx_id, row)
}
pub fn current_row_id(&self) -> Option<RowID> {
self.current_pos
}
pub fn current_row(&self) -> Result<Option<Row>> {
if let Some(id) = self.current_pos {
self.db.read(self.tx_id, id)
} else {
Ok(None)
}
}
pub fn close(self) -> Result<()> {
Ok(())
}
pub fn forward(&mut self) -> bool {
self.prev_pos = self.current_pos;
if let Some(row_id) = self.prev_pos {
let next_id = row_id.row_id + 1;
self.current_pos = self.db.get_next_row_id_for_table(self.table_id, next_id);
println!("{:?}", self.current_pos);
self.current_pos.is_some()
} else {
false
}
}
pub fn is_empty(&self) -> bool {
self.current_pos.is_none()
}
}
#[derive(Debug)]
pub struct BucketScanCursor<Clock: LogicalClock> {
pub db: Rc<MvStore<Clock>>,
pub bucket: Vec<RowID>,
bucket_size: u64,
table_id: u64,
tx_id: u64,
index: usize,
}
impl<Clock: LogicalClock> BucketScanCursor<Clock> {
pub fn new(
db: Rc<MvStore<Clock>>,
tx_id: u64,
table_id: u64,
size: u64,
) -> Result<BucketScanCursor<Clock>> {
let mut bucket = Vec::with_capacity(size as usize);
db.get_row_id_range(table_id, 0, &mut bucket, size)?;
Ok(Self {
db,
tx_id,
bucket,
bucket_size: size,
table_id,
index: 0,
})
}
pub fn insert(&self, row: Row) -> Result<()> {
self.db.insert(self.tx_id, row)
}
pub fn current_row_id(&self) -> Option<RowID> {
if self.index >= self.bucket.len() {
return None;
}
Some(self.bucket[self.index])
}
pub fn current_row(&self) -> Result<Option<Row>> {
if self.index >= self.bucket.len() {
return Ok(None);
}
let id = self.bucket[self.index];
self.db.read(self.tx_id, id)
}
pub fn close(self) -> Result<()> {
Ok(())
}
pub fn forward(&mut self) -> bool {
self.index += 1;
if self.index < self.bucket.len() {
true
} else {
self.next_bucket().unwrap_or_default()
}
}
pub fn is_empty(&self) -> bool {
self.index >= self.bucket.len()
}
fn next_bucket(&mut self) -> Result<bool> {
let last_id = if !self.bucket.is_empty() {
Some(self.bucket[self.bucket.len() - 1].row_id + 1)
} else {
None
};
self.bucket.clear();
if let Some(next_id) = last_id {
self.db
.get_row_id_range(self.table_id, next_id, &mut self.bucket, self.bucket_size)?;
self.index = 0;
Ok(!self.bucket.is_empty())
} else {
Ok(false)
}
}
}

View File

@@ -422,6 +422,58 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.collect())
}
pub fn get_row_id_range(
&self,
table_id: u64,
start: u64,
bucket: &mut Vec<RowID>,
max_items: u64,
) -> Result<()> {
tracing::trace!(
"get_row_id_in_range(table_id={}, range_start={})",
table_id,
start,
);
let start_id = RowID {
table_id,
row_id: start,
};
let end_id = RowID {
table_id,
row_id: u64::MAX,
};
self.rows
.range(start_id..end_id)
.take(max_items as usize)
.for_each(|entry| bucket.push(*entry.key()));
Ok(())
}
pub fn get_next_row_id_for_table(&self, table_id: u64, start: u64) -> Option<RowID> {
tracing::trace!(
"getting_next_id_for_table(table_id={}, range_start={})",
table_id,
start,
);
let min_bound = RowID {
table_id,
row_id: start,
};
let max_bound = RowID {
table_id,
row_id: u64::MAX,
};
self.rows
.range(min_bound..max_bound)
.next()
.map(|entry| *entry.key())
}
/// Begins a new transaction in the database.
///
/// This function starts a new transaction in the database and returns a `TxID` value

View File

@@ -617,6 +617,324 @@ fn test_future_row() {
assert_eq!(row, None);
}
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::cursor::{BucketScanCursor, LazyScanCursor, ScanCursor};
use crate::mvcc::database::{MvStore, Row, RowID};
use crate::mvcc::persistent_storage::Storage;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
// Simple atomic clock implementation for testing
struct TestClock {
counter: AtomicU64,
}
impl TestClock {
fn new(start: u64) -> Self {
Self {
counter: AtomicU64::new(start),
}
}
}
impl LogicalClock for TestClock {
fn get_timestamp(&self) -> u64 {
self.counter.fetch_add(1, Ordering::SeqCst)
}
fn reset(&self, ts: u64) {
let current = self.counter.load(Ordering::SeqCst);
if ts > current {
self.counter.store(ts, Ordering::SeqCst);
}
}
}
fn setup_test_db() -> (Rc<MvStore<TestClock>>, u64) {
let clock = TestClock::new(1);
let storage = Storage::new_noop();
let db = Rc::new(MvStore::new(clock, storage));
let tx_id = db.begin_tx();
let table_id = 1;
let test_rows = [
(5, b"row5".to_vec()),
(10, b"row10".to_vec()),
(15, b"row15".to_vec()),
(20, b"row20".to_vec()),
(30, b"row30".to_vec()),
];
for (row_id, data) in test_rows.iter() {
let id = RowID::new(table_id, *row_id);
let row = Row::new(id, data.clone());
db.insert(tx_id, row).unwrap();
}
db.commit_tx(tx_id).unwrap();
let tx_id = db.begin_tx();
(db, tx_id)
}
fn setup_sequential_db() -> (Rc<MvStore<TestClock>>, u64) {
let clock = TestClock::new(1);
let storage = Storage::new_noop();
let db = Rc::new(MvStore::new(clock, storage));
let tx_id = db.begin_tx();
let table_id = 1;
for i in 1..6 {
let id = RowID::new(table_id, i);
let data = format!("row{}", i).into_bytes();
let row = Row::new(id, data);
db.insert(tx_id, row).unwrap();
}
db.commit_tx(tx_id).unwrap();
let tx_id = db.begin_tx();
(db, tx_id)
}
#[test]
fn test_lazy_scan_cursor_basic() {
let (db, tx_id) = setup_sequential_db();
let table_id = 1;
let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 1);
// Iterate through all rows
let mut count = 1;
while cursor.forward() {
count += 1;
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, count);
}
// Should have found 5 rows
assert_eq!(count, 5);
// After the last row, is_empty should return true
assert!(cursor.forward() == false);
assert!(cursor.is_empty());
}
#[test]
fn test_lazy_scan_cursor_with_gaps() {
let (db, tx_id) = setup_test_db();
let table_id = 1;
let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 5);
// Test moving forward and checking IDs
let expected_ids = [5, 10, 15, 20, 30];
let mut index = 0;
assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]);
while cursor.forward() {
index += 1;
if index < expected_ids.len() {
assert_eq!(cursor.current_row_id().unwrap().row_id, expected_ids[index]);
}
}
// Should have found all 5 rows
assert_eq!(index, expected_ids.len() - 1);
}
#[test]
fn test_bucket_scan_cursor_basic() {
let (db, tx_id) = setup_sequential_db();
let table_id = 1;
// Create a bucket size that's smaller than the total rows
let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 3).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 1);
// Iterate through all rows
let mut count = 1;
let mut row_ids = Vec::new();
row_ids.push(row.id.row_id);
while cursor.forward() {
count += 1;
let row = cursor.current_row().unwrap().unwrap();
row_ids.push(row.id.row_id);
}
// Should have found 5 rows
assert_eq!(count, 5);
assert_eq!(row_ids, vec![1, 2, 3, 4, 5]);
// After the last row, is_empty should return true
assert!(cursor.is_empty());
}
#[test]
fn test_bucket_scan_cursor_with_gaps() {
let (db, tx_id) = setup_test_db();
let table_id = 1;
// Create a bucket size of 2 to force multiple bucket loads
let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 2).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 5);
// Test moving forward and checking IDs
let expected_ids = [5, 10, 15, 20, 30];
let mut row_ids = Vec::new();
row_ids.push(row.id.row_id);
while cursor.forward() {
let row = cursor.current_row().unwrap().unwrap();
row_ids.push(row.id.row_id);
}
// Should have all expected IDs
assert_eq!(row_ids, expected_ids);
// After the last row, is_empty should return true
assert!(cursor.is_empty());
}
#[test]
fn test_scan_cursor_basic() {
let (db, tx_id) = setup_sequential_db();
let table_id = 1;
let mut cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap();
// Check first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 1);
// Iterate through all rows
let mut count = 1;
while cursor.forward() {
count += 1;
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, count);
}
// Should have found 5 rows
assert_eq!(count, 5);
// After the last row, is_empty should return true
assert!(cursor.forward() == false);
assert!(cursor.is_empty());
}
#[test]
fn test_cursor_with_empty_table() {
let clock = TestClock::new(1);
let storage = Storage::new_noop();
let db = Rc::new(MvStore::new(clock, storage));
let tx_id = db.begin_tx();
let table_id = 1; // Empty table
// Test LazyScanCursor with empty table
let cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
assert!(cursor.is_empty());
assert!(cursor.current_row_id().is_none());
// Test BucketScanCursor with empty table
let cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 10).unwrap();
assert!(cursor.is_empty());
assert!(cursor.current_row_id().is_none());
// Test ScanCursor with empty table
let cursor = ScanCursor::new(db.clone(), tx_id, table_id).unwrap();
assert!(cursor.is_empty());
assert!(cursor.current_row_id().is_none());
}
#[test]
fn test_cursor_modification_during_scan() {
let (db, tx_id) = setup_sequential_db();
let table_id = 1;
let mut cursor = LazyScanCursor::new(db.clone(), tx_id, table_id).unwrap();
// Read first row
let first_row = cursor.current_row().unwrap().unwrap();
assert_eq!(first_row.id.row_id, 1);
// Insert a new row with ID between existing rows
let new_row_id = RowID::new(table_id, 3);
let new_row_data = b"new_row".to_vec();
let new_row = Row::new(new_row_id, new_row_data);
cursor.insert(new_row).unwrap();
// Continue scanning - the cursor should still work correctly
cursor.forward(); // Move to 2
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 2);
cursor.forward(); // Move to 3 (our new row)
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 3);
assert_eq!(row.data, b"new_row".to_vec());
cursor.forward(); // Move to 4
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 4);
}
#[test]
fn test_bucket_scan_cursor_next_bucket() {
let (db, tx_id) = setup_test_db();
let table_id = 1;
// Create a bucket size of 1 to force bucket loading for each row
let mut cursor = BucketScanCursor::new(db.clone(), tx_id, table_id, 1).unwrap();
// Get the first row
assert!(!cursor.is_empty());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 5);
// Move to the next row - this should trigger next_bucket()
assert!(cursor.forward());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 10);
// Move to the next row again
assert!(cursor.forward());
let row = cursor.current_row().unwrap().unwrap();
assert_eq!(row.id.row_id, 15);
// Continue to the end
assert!(cursor.forward());
assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 20);
assert!(cursor.forward());
assert_eq!(cursor.current_row().unwrap().unwrap().id.row_id, 30);
// Should be no more rows
assert!(!cursor.forward());
assert!(cursor.is_empty());
}
/* States described in the Hekaton paper *for serializability*:
Table 1: Case analysis of action to take when version Vs