Merge ' core/mvcc: fix new rowid on restart' from Pere Diaz Bou

Next rowid was being tracked globally for all tables and restarted to 0
every time database was opened

Closes #2425
This commit is contained in:
Pekka Enberg
2025-08-04 14:37:13 +03:00
committed by GitHub
6 changed files with 161 additions and 28 deletions

View File

@@ -130,4 +130,25 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
pub fn rewind(&mut self) {
self.current_pos = CursorPosition::BeforeFirst;
}
pub fn last(&mut self) {
let last_rowid = self.db.get_last_rowid(self.table_id);
if let Some(last_rowid) = last_rowid {
self.current_pos = CursorPosition::Loaded(RowID {
table_id: self.table_id,
row_id: last_rowid,
});
} else {
self.current_pos = CursorPosition::BeforeFirst;
}
}
pub fn get_next_rowid(&mut self) -> i64 {
self.last();
match self.current_pos {
CursorPosition::Loaded(id) => id.row_id + 1,
CursorPosition::BeforeFirst => i64::MIN,
CursorPosition::End => i64::MAX,
}
}
}

View File

@@ -15,6 +15,7 @@ use parking_lot::RwLock;
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::Bound;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -1222,6 +1223,18 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
Ok(())
}
pub fn get_last_rowid(&self, table_id: u64) -> Option<i64> {
let last_rowid = self
.rows
.upper_bound(Bound::Included(&RowID {
table_id,
row_id: i64::MAX,
}))
.map(|entry| Some(entry.key().row_id))
.unwrap_or(None);
last_rowid
}
}
/// A write-write conflict happens when transaction T_current attempts to update a

View File

@@ -1,13 +1,16 @@
use super::*;
use crate::io::PlatformIO;
use crate::mvcc::clock::LocalClock;
pub(crate) struct MvccTestDbNoConn {
pub(crate) db: Arc<Database>,
pub(crate) db: Option<Arc<Database>>,
path: Option<String>,
// Stored mainly to not drop the temp dir before the test is done.
_temp_dir: Option<tempfile::TempDir>,
}
pub(crate) struct MvccTestDb {
pub(crate) mvcc_store: Arc<MvStore<LocalClock>>,
pub(crate) _db: Arc<Database>,
pub(crate) db: Arc<Database>,
pub(crate) conn: Arc<Connection>,
}
@@ -19,7 +22,7 @@ impl MvccTestDb {
let mvcc_store = db.mv_store.as_ref().unwrap().clone();
Self {
mvcc_store,
_db: db,
db,
conn,
}
}
@@ -29,7 +32,50 @@ impl MvccTestDbNoConn {
pub fn new() -> Self {
let io = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:", true, true).unwrap();
Self { db }
Self {
db: Some(db),
path: None,
_temp_dir: None,
}
}
/// Opens a database with a file
pub fn new_with_random_db() -> Self {
let temp_dir = tempfile::TempDir::new().unwrap();
let path = temp_dir
.path()
.join(format!("test_{}", rand::random::<u64>()));
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
let io = Arc::new(PlatformIO::new().unwrap());
println!("path: {}", path.as_os_str().to_str().unwrap());
let db = Database::open_file(io.clone(), path.as_os_str().to_str().unwrap(), true, true)
.unwrap();
Self {
db: Some(db),
path: Some(path.to_str().unwrap().to_string()),
_temp_dir: Some(temp_dir),
}
}
/// Restarts the database, make sure there is no connection to the database open before calling this!
pub fn restart(&mut self) {
let io = Arc::new(PlatformIO::new().unwrap());
let path = self.path.as_ref().unwrap();
let db = Database::open_file(io.clone(), path, true, true).unwrap();
self.db.replace(db);
}
/// Asumes there is a database open
pub fn get_db(&self) -> Arc<Database> {
self.db.as_ref().unwrap().clone()
}
pub fn connect(&self) -> Arc<Connection> {
self.get_db().connect().unwrap()
}
pub fn get_mvcc_store(&self) -> Arc<MvStore<LocalClock>> {
self.get_db().mv_store.as_ref().unwrap().clone()
}
}
@@ -292,7 +338,7 @@ fn test_dirty_write() {
.unwrap();
assert_eq!(tx1_row, row);
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "World");
@@ -325,7 +371,7 @@ fn test_dirty_read() {
db.mvcc_store.insert(tx1, row1).unwrap();
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let row2 = db
.mvcc_store
@@ -351,7 +397,7 @@ fn test_dirty_read_deleted() {
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// T2 deletes row with ID 1, but does not commit.
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
assert!(db
.mvcc_store
@@ -366,7 +412,7 @@ fn test_dirty_read_deleted() {
.unwrap());
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
let conn3 = db._db.connect().unwrap();
let conn3 = db.db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let row = db
.mvcc_store
@@ -405,7 +451,7 @@ fn test_fuzzy_read() {
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// T2 reads the row with ID 1 within an active transaction.
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let row = db
.mvcc_store
@@ -421,7 +467,7 @@ fn test_fuzzy_read() {
assert_eq!(tx1_row, row);
// T3 updates the row and commits.
let conn3 = db._db.connect().unwrap();
let conn3 = db.db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let tx3_row = generate_simple_string_row(1, 1, "Second");
db.mvcc_store
@@ -475,7 +521,7 @@ fn test_lost_update() {
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// T2 attempts to update row ID 1 within an active transaction.
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "World");
assert!(db
@@ -484,7 +530,7 @@ fn test_lost_update() {
.unwrap());
// T3 also attempts to update row ID 1 within an active transaction.
let conn3 = db._db.connect().unwrap();
let conn3 = db.db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let tx3_row = generate_simple_string_row(1, 1, "Hello, world!");
assert!(matches!(
@@ -499,7 +545,7 @@ fn test_lost_update() {
Err(LimboError::TxTerminated)
));
let conn4 = db._db.connect().unwrap();
let conn4 = db.db.connect().unwrap();
let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone());
let row = db
.mvcc_store
@@ -528,7 +574,7 @@ fn test_committed_visibility() {
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// but I like more money, so let me try adding $10 more
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "20");
assert!(db
@@ -549,7 +595,7 @@ fn test_committed_visibility() {
assert_eq!(row, tx2_row);
// can I check how much money I have?
let conn3 = db._db.connect().unwrap();
let conn3 = db.db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let row = db
.mvcc_store
@@ -572,7 +618,7 @@ fn test_future_row() {
let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let conn2 = db._db.connect().unwrap();
let conn2 = db.db.connect().unwrap();
let tx2 = db.mvcc_store.begin_tx(conn2.pager.borrow().clone());
let tx2_row = generate_simple_string_row(1, 1, "Hello");
db.mvcc_store.insert(tx2, tx2_row).unwrap();
@@ -682,12 +728,12 @@ pub(crate) fn commit_tx_no_conn(
conn: &Arc<Connection>,
) -> Result<(), LimboError> {
let mut sm = db
.db
.get_db()
.get_mv_store()
.unwrap()
.commit_tx(tx_id, conn.pager.borrow().clone(), conn)
.unwrap();
let result = sm.step(db.db.mv_store.as_ref().unwrap())?;
let result = sm.step(&db.get_mvcc_store())?;
assert!(sm.is_finalized());
match result {
TransitionResult::Done(()) => Ok(()),
@@ -1028,3 +1074,46 @@ fn test_snapshot_isolation_tx_visible1() {
Some(TxTimestampOrID::TxID(7))
));
}
#[test]
fn test_restart() {
let mut db = MvccTestDbNoConn::new_with_random_db();
{
let conn = db.connect();
let mvcc_store = db.get_mvcc_store();
let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone());
let row = generate_simple_string_row(1, 1, "foo");
mvcc_store.insert(tx_id, row).unwrap();
commit_tx(mvcc_store.clone(), &conn, tx_id).unwrap();
conn.close().unwrap();
}
db.restart();
{
let conn = db.connect();
let mvcc_store = db.get_mvcc_store();
let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone());
let row = generate_simple_string_row(1, 2, "bar");
mvcc_store.insert(tx_id, row).unwrap();
commit_tx(mvcc_store.clone(), &conn, tx_id).unwrap();
let tx_id = mvcc_store.begin_tx(conn.pager.borrow().clone());
let row = mvcc_store.read(tx_id, RowID::new(1, 2)).unwrap().unwrap();
let record = get_record_value(&row);
match record.get_value(0).unwrap() {
RefValue::Text(text) => {
assert_eq!(text.as_str(), "bar");
}
_ => panic!("Expected Text value"),
}
conn.close().unwrap();
}
}
fn get_record_value(row: &Row) -> ImmutableRecord {
let mut record = ImmutableRecord::new(1024);
record.start_serialization(&row.data);
record
}

View File

@@ -62,8 +62,8 @@ mod tests {
let th1 = {
let db = db.clone();
std::thread::spawn(move || {
let conn = db.db.connect().unwrap();
let mvcc_store = db.db.mv_store.as_ref().unwrap().clone();
let conn = db.get_db().connect().unwrap();
let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone();
for _ in 0..iterations {
let tx = mvcc_store.begin_tx(conn.pager.borrow().clone());
let id = IDS.fetch_add(1, Ordering::SeqCst);
@@ -83,8 +83,8 @@ mod tests {
};
let th2 = {
std::thread::spawn(move || {
let conn = db.db.connect().unwrap();
let mvcc_store = db.db.mv_store.as_ref().unwrap().clone();
let conn = db.get_db().connect().unwrap();
let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone();
for _ in 0..iterations {
let tx = mvcc_store.begin_tx(conn.pager.borrow().clone());
let id = IDS.fetch_add(1, Ordering::SeqCst);
@@ -116,8 +116,8 @@ mod tests {
let work = |prefix: &'static str| {
let db = db.clone();
std::thread::spawn(move || {
let conn = db.db.connect().unwrap();
let mvcc_store = db.db.mv_store.as_ref().unwrap().clone();
let conn = db.get_db().connect().unwrap();
let mvcc_store = db.get_db().mv_store.as_ref().unwrap().clone();
let mut failed_upserts = 0;
for i in 0..iterations {
if i % 1000 == 0 {

View File

@@ -5496,6 +5496,10 @@ impl BTreeCursor {
self.pager
.do_allocate_page(page_type, offset, BtreePageAllocMode::Any)
}
pub fn get_mvcc_cursor(&self) -> Rc<RefCell<MvCursor>> {
self.mv_cursor.as_ref().unwrap().clone()
}
}
#[derive(Debug, thiserror::Error)]

View File

@@ -15,6 +15,7 @@ use crate::util::{normalize_ident, IOExt as _};
use crate::vdbe::insn::InsertFlags;
use crate::vdbe::registers_to_ref_values;
use crate::vector::{vector_concat, vector_slice};
use crate::MvCursor;
use crate::{
error::{
LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY,
@@ -61,8 +62,7 @@ use crate::{
};
use crate::{
info, turso_assert, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult,
TransactionState,
info, turso_assert, BufferPool, OpenFlags, RefValue, Row, StepResult, TransactionState,
};
use super::{
@@ -5481,7 +5481,13 @@ pub fn op_new_rowid(
};
if let Some(mv_store) = mv_store {
let rowid = mv_store.get_next_rowid();
let rowid = {
let mut cursor = state.get_cursor(*cursor);
let cursor = cursor.as_btree_mut();
let mvcc_cursor = cursor.get_mvcc_cursor();
let mut mvcc_cursor = RefCell::borrow_mut(&mvcc_cursor);
mvcc_cursor.get_next_rowid()
};
state.registers[*rowid_reg] = Register::Value(Value::Integer(rowid));
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);