diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 1098a0b39..e62535526 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4352,14 +4352,11 @@ impl BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] pub fn rowid(&self) -> Result>> { if let Some(mv_cursor) = &self.mv_cursor { - if self.has_record.get() { - let mut mv_cursor = mv_cursor.borrow_mut(); - return Ok(IOResult::Done( - mv_cursor.current_row_id().map(|rowid| rowid.row_id), - )); - } else { + let mut mv_cursor = mv_cursor.borrow_mut(); + let Some(rowid) = mv_cursor.current_row_id() else { return Ok(IOResult::Done(None)); - } + }; + return Ok(IOResult::Done(Some(rowid.row_id))); } if self.get_null_flag() { return Ok(IOResult::Done(None)); @@ -4407,7 +4404,7 @@ impl BTreeCursor { /// back. #[instrument(skip(self), level = Level::DEBUG)] pub fn record(&self) -> Result>>> { - if !self.has_record.get() { + if !self.has_record.get() && self.mv_cursor.is_none() { return Ok(IOResult::Done(None)); } let invalidated = self @@ -4421,9 +4418,11 @@ impl BTreeCursor { .unwrap(); return Ok(IOResult::Done(Some(record_ref))); } - if self.mv_cursor.is_some() { - let mut mv_cursor = self.mv_cursor.as_ref().unwrap().borrow_mut(); - let row = mv_cursor.current_row().unwrap().unwrap(); + if let Some(mv_cursor) = &self.mv_cursor { + let mut mv_cursor = mv_cursor.borrow_mut(); + let Some(row) = mv_cursor.current_row()? else { + return Ok(IOResult::Done(None)); + }; self.get_immutable_record_or_create() .as_mut() .unwrap() diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index e969eaef7..8ef366dfe 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -244,3 +244,154 @@ fn test_mvcc_transactions_deferred() { assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); } } + +#[test] +fn test_mvcc_insert_select_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_update_basic.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let stmt = conn1 + .query("SELECT * FROM test WHERE id = 1") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::Integer(1), Value::build_text("first")]); +} + +#[test] +fn test_mvcc_update_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_update_basic.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let stmt = conn1 + .query("SELECT value FROM test WHERE id = 1") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("first")]); + + conn1 + .execute("UPDATE test SET value = 'second' WHERE id = 1") + .unwrap(); + + let stmt = conn1 + .query("SELECT value FROM test WHERE id = 1") + .unwrap() + .unwrap(); + let row = helper_read_single_row(stmt); + assert_eq!(row, vec![Value::build_text("second")]); +} + +#[test] +#[ignore] +// This test panics: cannot start a new read tx without ending an existing one, lock_value=0, expected=18446744073709551615 +fn test_mvcc_begin_concurrent_smoke() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_begin_concurrent_smoke.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + conn1.execute("BEGIN CONCURRENT").unwrap(); + conn1 + .execute("CREATE TABLE test (id INTEGER, value TEXT)") + .unwrap(); + conn1.execute("COMMIT").unwrap(); +} + +#[test] +#[ignore] +// This test panics: cannot start a new read tx without ending an existing one, lock_value=1, expected=18446744073709551615 +fn test_mvcc_concurrent_insert_basic() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_update_basic.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN CONCURRENT").unwrap(); + conn2.execute("BEGIN CONCURRENT").unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + + conn1.execute("COMMIT").unwrap(); + conn2.execute("COMMIT").unwrap(); + + let stmt = conn1.query("SELECT * FROM test").unwrap().unwrap(); + let rows = helper_read_all_rows(stmt); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::build_text("first")], + vec![Value::Integer(2), Value::build_text("second")], + ] + ); +} + +fn helper_read_all_rows(mut stmt: turso_core::Statement) -> Vec> { + let mut ret = Vec::new(); + loop { + match stmt.step().unwrap() { + StepResult::Row => { + ret.push(stmt.row().unwrap().get_values().cloned().collect()); + } + StepResult::IO => stmt.run_once().unwrap(), + StepResult::Done => break, + StepResult::Busy => panic!("database is busy"), + StepResult::Interrupt => panic!("interrupted"), + } + } + ret +} + +fn helper_read_single_row(mut stmt: turso_core::Statement) -> Vec { + let mut read_count = 0; + let mut ret = None; + loop { + match stmt.step().unwrap() { + StepResult::Row => { + assert_eq!(read_count, 0); + read_count += 1; + let row = stmt.row().unwrap(); + ret = Some(row.get_values().cloned().collect()); + } + StepResult::IO => stmt.run_once().unwrap(), + StepResult::Done => break, + StepResult::Busy => panic!("database is busy"), + StepResult::Interrupt => panic!("interrupted"), + } + } + + ret.unwrap() +}