MVCC: remove reliance on BTreeCursor::has_record()

This commit is contained in:
Jussi Saurio
2025-09-12 15:59:31 +03:00
parent ff23f9795b
commit 305b2f55ae
2 changed files with 161 additions and 11 deletions

View File

@@ -4352,14 +4352,11 @@ impl BTreeCursor {
#[instrument(skip(self), level = Level::DEBUG)]
pub fn rowid(&self) -> Result<IOResult<Option<i64>>> {
if let Some(mv_cursor) = &self.mv_cursor {
if self.has_record.get() {
let mut mv_cursor = mv_cursor.borrow_mut();
return Ok(IOResult::Done(
mv_cursor.current_row_id().map(|rowid| rowid.row_id),
));
} else {
let mut mv_cursor = mv_cursor.borrow_mut();
let Some(rowid) = mv_cursor.current_row_id() else {
return Ok(IOResult::Done(None));
}
};
return Ok(IOResult::Done(Some(rowid.row_id)));
}
if self.get_null_flag() {
return Ok(IOResult::Done(None));
@@ -4407,7 +4404,7 @@ impl BTreeCursor {
/// back.
#[instrument(skip(self), level = Level::DEBUG)]
pub fn record(&self) -> Result<IOResult<Option<Ref<ImmutableRecord>>>> {
if !self.has_record.get() {
if !self.has_record.get() && self.mv_cursor.is_none() {
return Ok(IOResult::Done(None));
}
let invalidated = self
@@ -4421,9 +4418,11 @@ impl BTreeCursor {
.unwrap();
return Ok(IOResult::Done(Some(record_ref)));
}
if self.mv_cursor.is_some() {
let mut mv_cursor = self.mv_cursor.as_ref().unwrap().borrow_mut();
let row = mv_cursor.current_row().unwrap().unwrap();
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
let Some(row) = mv_cursor.current_row()? else {
return Ok(IOResult::Done(None));
};
self.get_immutable_record_or_create()
.as_mut()
.unwrap()

View File

@@ -244,3 +244,154 @@ fn test_mvcc_transactions_deferred() {
assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2));
}
}
#[test]
fn test_mvcc_insert_select_basic() {
let tmp_db = TempDatabase::new_with_opts(
"test_mvcc_update_basic.db",
turso_core::DatabaseOpts::new().with_mvcc(true),
);
let conn1 = tmp_db.connect_limbo();
conn1
.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
.unwrap();
conn1
.execute("INSERT INTO test (id, value) VALUES (1, 'first')")
.unwrap();
let stmt = conn1
.query("SELECT * FROM test WHERE id = 1")
.unwrap()
.unwrap();
let row = helper_read_single_row(stmt);
assert_eq!(row, vec![Value::Integer(1), Value::build_text("first")]);
}
#[test]
fn test_mvcc_update_basic() {
let tmp_db = TempDatabase::new_with_opts(
"test_mvcc_update_basic.db",
turso_core::DatabaseOpts::new().with_mvcc(true),
);
let conn1 = tmp_db.connect_limbo();
conn1
.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
.unwrap();
conn1
.execute("INSERT INTO test (id, value) VALUES (1, 'first')")
.unwrap();
let stmt = conn1
.query("SELECT value FROM test WHERE id = 1")
.unwrap()
.unwrap();
let row = helper_read_single_row(stmt);
assert_eq!(row, vec![Value::build_text("first")]);
conn1
.execute("UPDATE test SET value = 'second' WHERE id = 1")
.unwrap();
let stmt = conn1
.query("SELECT value FROM test WHERE id = 1")
.unwrap()
.unwrap();
let row = helper_read_single_row(stmt);
assert_eq!(row, vec![Value::build_text("second")]);
}
#[test]
#[ignore]
// This test panics: cannot start a new read tx without ending an existing one, lock_value=0, expected=18446744073709551615
fn test_mvcc_begin_concurrent_smoke() {
let tmp_db = TempDatabase::new_with_opts(
"test_mvcc_begin_concurrent_smoke.db",
turso_core::DatabaseOpts::new().with_mvcc(true),
);
let conn1 = tmp_db.connect_limbo();
conn1.execute("BEGIN CONCURRENT").unwrap();
conn1
.execute("CREATE TABLE test (id INTEGER, value TEXT)")
.unwrap();
conn1.execute("COMMIT").unwrap();
}
#[test]
#[ignore]
// This test panics: cannot start a new read tx without ending an existing one, lock_value=1, expected=18446744073709551615
fn test_mvcc_concurrent_insert_basic() {
let tmp_db = TempDatabase::new_with_opts(
"test_mvcc_update_basic.db",
turso_core::DatabaseOpts::new().with_mvcc(true),
);
let conn1 = tmp_db.connect_limbo();
let conn2 = tmp_db.connect_limbo();
conn1
.execute("CREATE TABLE test (id INTEGER, value TEXT)")
.unwrap();
conn1.execute("BEGIN CONCURRENT").unwrap();
conn2.execute("BEGIN CONCURRENT").unwrap();
conn1
.execute("INSERT INTO test (id, value) VALUES (1, 'first')")
.unwrap();
conn2
.execute("INSERT INTO test (id, value) VALUES (2, 'second')")
.unwrap();
conn1.execute("COMMIT").unwrap();
conn2.execute("COMMIT").unwrap();
let stmt = conn1.query("SELECT * FROM test").unwrap().unwrap();
let rows = helper_read_all_rows(stmt);
assert_eq!(
rows,
vec![
vec![Value::Integer(1), Value::build_text("first")],
vec![Value::Integer(2), Value::build_text("second")],
]
);
}
fn helper_read_all_rows(mut stmt: turso_core::Statement) -> Vec<Vec<Value>> {
let mut ret = Vec::new();
loop {
match stmt.step().unwrap() {
StepResult::Row => {
ret.push(stmt.row().unwrap().get_values().cloned().collect());
}
StepResult::IO => stmt.run_once().unwrap(),
StepResult::Done => break,
StepResult::Busy => panic!("database is busy"),
StepResult::Interrupt => panic!("interrupted"),
}
}
ret
}
fn helper_read_single_row(mut stmt: turso_core::Statement) -> Vec<Value> {
let mut read_count = 0;
let mut ret = None;
loop {
match stmt.step().unwrap() {
StepResult::Row => {
assert_eq!(read_count, 0);
read_count += 1;
let row = stmt.row().unwrap();
ret = Some(row.get_values().cloned().collect());
}
StepResult::IO => stmt.run_once().unwrap(),
StepResult::Done => break,
StepResult::Busy => panic!("database is busy"),
StepResult::Interrupt => panic!("interrupted"),
}
}
ret.unwrap()
}