diff --git a/bindings/go/rs_src/lib.rs b/bindings/go/rs_src/lib.rs index b2240a16b..297b5a7ce 100644 --- a/bindings/go/rs_src/lib.rs +++ b/bindings/go/rs_src/lib.rs @@ -25,7 +25,7 @@ pub unsafe extern "C" fn db_open(path: *const c_char) -> *mut c_void { p if p.contains(":memory:") => Arc::new(limbo_core::MemoryIO::new()), _ => Arc::new(limbo_core::PlatformIO::new().expect("Failed to create IO")), }; - let db = Database::open_file(io.clone(), path); + let db = Database::open_file(io.clone(), path, false); match db { Ok(db) => { let conn = db.connect().unwrap(); diff --git a/bindings/java/rs_src/limbo_db.rs b/bindings/java/rs_src/limbo_db.rs index ef33bf6e2..189ed090b 100644 --- a/bindings/java/rs_src/limbo_db.rs +++ b/bindings/java/rs_src/limbo_db.rs @@ -67,7 +67,7 @@ pub extern "system" fn Java_tech_turso_core_LimboDB_openUtf8<'local>( } }; - let db = match Database::open_file(io.clone(), &path) { + let db = match Database::open_file(io.clone(), &path, false) { Ok(db) => db, Err(e) => { set_err_msg_and_throw_exception(&mut env, obj, LIMBO_ETC, e.to_string()); diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index fa9045ef7..f6c505ba9 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -277,7 +277,7 @@ pub fn connect(path: &str) -> Result { io: Arc, path: &str, ) -> std::result::Result, PyErr> { - limbo_core::Database::open_file(io, path).map_err(|e| { + limbo_core::Database::open_file(io, path, false).map_err(|e| { PyErr::new::(format!("Failed to open database: {:?}", e)) }) } diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index d99e7bbb6..b65624d10 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -42,12 +42,12 @@ impl Builder { match self.path.as_str() { ":memory:" => { let io: Arc = Arc::new(limbo_core::MemoryIO::new()); - let db = limbo_core::Database::open_file(io, self.path.as_str())?; + let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?; Ok(Database { inner: db }) } path => { let io: Arc = Arc::new(limbo_core::PlatformIO::new()?); - let db = limbo_core::Database::open_file(io, path)?; + let db = limbo_core::Database::open_file(io, path, false)?; Ok(Database { inner: db }) } } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 6627b6fac..88827e79e 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -32,7 +32,7 @@ impl Database { let wal_path = format!("{}-wal", path); let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size).unwrap(); - let db = limbo_core::Database::open(io, page_io, wal_shared).unwrap(); + let db = limbo_core::Database::open(io, page_io, wal_shared, false).unwrap(); let conn = db.connect().unwrap(); Database { db, conn } } diff --git a/cli/app.rs b/cli/app.rs index 4d1470adf..999dd935d 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -51,6 +51,8 @@ pub struct Opts { \t'io-uring' when built for Linux with feature 'io_uring'\n" )] pub io: Io, + #[clap(long, help = "Enable experimental MVCC feature")] + pub experimental_mvcc: bool, } #[derive(Debug, Clone)] @@ -210,7 +212,7 @@ impl<'a> Limbo<'a> { _path => get_io(DbLocation::Path, opts.io)?, } }; - let db = Database::open_file(io.clone(), &db_file)?; + let db = Database::open_file(io.clone(), &db_file, opts.experimental_mvcc)?; let conn = db.connect().unwrap(); let h = LimboHelper::new(conn.clone(), io.clone()); rl.set_helper(Some(h)); @@ -412,7 +414,7 @@ impl<'a> Limbo<'a> { } }; self.io = Arc::clone(&io); - let db = Database::open_file(self.io.clone(), path)?; + let db = Database::open_file(self.io.clone(), path, self.opts.experimental_mvcc)?; self.conn = db.connect().unwrap(); self.opts.db_file = path.to_string(); Ok(()) diff --git a/cli/input.rs b/cli/input.rs index f1c1fd3f6..459b9ac2a 100644 --- a/cli/input.rs +++ b/cli/input.rs @@ -65,6 +65,7 @@ pub struct Settings { pub echo: bool, pub is_stdout: bool, pub io: Io, + pub experimental_mvcc: bool, } impl From<&Opts> for Settings { @@ -80,6 +81,7 @@ impl From<&Opts> for Settings { .as_ref() .map_or(":memory:".to_string(), |p| p.to_string_lossy().to_string()), io: opts.io, + experimental_mvcc: opts.experimental_mvcc, } } } diff --git a/core/benches/benchmark.rs b/core/benches/benchmark.rs index 57deca54e..93c071bcb 100644 --- a/core/benches/benchmark.rs +++ b/core/benches/benchmark.rs @@ -18,7 +18,7 @@ fn bench(criterion: &mut Criterion) { #[allow(clippy::arc_with_non_send_sync)] let io = Arc::new(PlatformIO::new().unwrap()); - let db = Database::open_file(io.clone(), "../testing/testing.db").unwrap(); + let db = Database::open_file(io.clone(), "../testing/testing.db", false).unwrap(); let limbo_conn = db.connect().unwrap(); let queries = [ diff --git a/core/lib.rs b/core/lib.rs index 67674e544..9632e1829 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -83,7 +83,12 @@ enum TransactionState { None, } +pub(crate) type MvStore = crate::mvcc::MvStore; + +pub(crate) type MvCursor = crate::mvcc::cursor::ScanCursor; + pub struct Database { + mv_store: Option>, schema: Arc>, // TODO: make header work without lock header: Arc>, @@ -101,7 +106,7 @@ unsafe impl Sync for Database {} impl Database { #[cfg(feature = "fs")] - pub fn open_file(io: Arc, path: &str) -> Result> { + pub fn open_file(io: Arc, path: &str, enable_mvcc: bool) -> Result> { use storage::wal::WalFileShared; let file = io.open_file(path, OpenFlags::Create, true)?; @@ -112,7 +117,7 @@ impl Database { io.run_once()?; let page_size = db_header.lock().unwrap().page_size; let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?; - Self::open(io, page_io, wal_shared) + Self::open(io, page_io, wal_shared, enable_mvcc) } #[allow(clippy::arc_with_non_send_sync)] @@ -120,6 +125,7 @@ impl Database { io: Arc, page_io: Arc, shared_wal: Arc>, + enable_mvcc: bool, ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; @@ -127,11 +133,20 @@ impl Database { let version = db_header.lock().unwrap().version_number; version.to_string() }); + let mv_store = if enable_mvcc { + Some(Rc::new(MvStore::new( + crate::mvcc::LocalClock::new(), + crate::mvcc::persistent_storage::Storage::new_noop(), + ))) + } else { + None + }; let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); let page_size = db_header.lock().unwrap().page_size; let header = db_header; let schema = Arc::new(RwLock::new(Schema::new())); let db = Database { + mv_store, schema: schema.clone(), header: header.clone(), shared_page_cache: shared_page_cache.clone(), @@ -149,7 +164,7 @@ impl Database { .try_write() .expect("lock on schema should succeed first try"); let syms = conn.syms.borrow(); - parse_schema_rows(rows, &mut schema, io, syms.deref())?; + parse_schema_rows(rows, &mut schema, io, syms.deref(), None)?; } Ok(db) } @@ -178,6 +193,7 @@ impl Database { header: self.header.clone(), last_insert_rowid: Cell::new(0), auto_commit: RefCell::new(true), + mv_transactions: RefCell::new(Vec::new()), transaction_state: RefCell::new(TransactionState::None), last_change: Cell::new(0), syms: RefCell::new(SymbolTable::new()), @@ -244,6 +260,7 @@ pub struct Connection { schema: Arc>, header: Arc>, auto_commit: RefCell, + mv_transactions: RefCell>, transaction_state: RefCell, last_insert_rowid: Cell, last_change: Cell, @@ -274,7 +291,11 @@ impl Connection { &syms, QueryMode::Normal, )?); - Ok(Statement::new(program, self.pager.clone())) + Ok(Statement::new( + program, + self._db.mv_store.clone(), + self.pager.clone(), + )) } Cmd::Explain(_stmt) => todo!(), Cmd::ExplainQueryPlan(_stmt) => todo!(), @@ -312,7 +333,7 @@ impl Connection { &syms, QueryMode::Normal, )?); - let stmt = Statement::new(program, self.pager.clone()); + let stmt = Statement::new(program, self._db.mv_store.clone(), self.pager.clone()); Ok(Some(stmt)) } Cmd::Explain(stmt) => { @@ -407,7 +428,7 @@ impl Connection { let mut state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); - program.step(&mut state, self.pager.clone())?; + program.step(&mut state, self._db.mv_store.clone(), self.pager.clone())?; } } } @@ -489,25 +510,36 @@ impl Connection { pub struct Statement { program: Rc, state: vdbe::ProgramState, + mv_store: Option>, pager: Rc, } impl Statement { - pub fn new(program: Rc, pager: Rc) -> Self { + pub fn new( + program: Rc, + mv_store: Option>, + pager: Rc, + ) -> Self { let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); Self { program, state, + mv_store, pager, } } + pub fn set_mv_tx_id(&mut self, mv_tx_id: Option) { + self.state.mv_tx_id = mv_tx_id; + } + pub fn interrupt(&mut self) { self.state.interrupt(); } pub fn step(&mut self) -> Result { - self.program.step(&mut self.state, self.pager.clone()) + self.program + .step(&mut self.state, self.mv_store.clone(), self.pager.clone()) } pub fn num_columns(&self) -> usize { diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index b52fdd3d0..5fed0941f 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -1,17 +1,18 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::database::{MvStore, Result, Row, RowID}; use std::fmt::Debug; +use std::rc::Rc; #[derive(Debug)] -pub struct ScanCursor<'a, Clock: LogicalClock> { - pub db: &'a MvStore, +pub struct ScanCursor { + pub db: Rc>, pub row_ids: Vec, pub index: usize, tx_id: u64, } -impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { - pub fn new(db: &'a MvStore, tx_id: u64, table_id: u64) -> Result> { +impl ScanCursor { + pub fn new(db: Rc>, tx_id: u64, table_id: u64) -> Result> { let row_ids = db.scan_row_ids_for_table(table_id)?; Ok(Self { db, @@ -21,6 +22,10 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { }) } + pub fn insert(&self, row: Row) -> Result<()> { + self.db.insert(self.tx_id, row) + } + pub fn current_row_id(&self) -> Option { if self.index >= self.row_ids.len() { return None; diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 21cf91f0a..ea19b4386 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -17,6 +17,12 @@ pub struct RowID { pub row_id: u64, } +impl RowID { + pub fn new(table_id: u64, row_id: u64) -> Self { + Self { table_id, row_id } + } +} + #[derive(Clone, Debug, PartialEq, PartialOrd)] pub struct Row { @@ -24,6 +30,12 @@ pub struct Row { pub data: Vec, } +impl Row { + pub fn new(id: RowID, data: Vec) -> Self { + Self { id, data } + } +} + /// A row version. #[derive(Clone, Debug, PartialEq)] pub struct RowVersion { @@ -242,6 +254,7 @@ impl MvStore { /// * `row` - the row object containing the values to be inserted. /// pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + tracing::trace!("insert(tx_id={}, row.id={:?})", tx_id, row.id); let tx = self .txs .get(&tx_id) @@ -279,6 +292,7 @@ impl MvStore { /// /// Returns `true` if the row was successfully updated, and `false` otherwise. pub fn update(&self, tx_id: TxID, row: Row) -> Result { + tracing::trace!("update(tx_id={}, row.id={:?})", tx_id, row.id); if !self.delete(tx_id, row.id)? { return Ok(false); } @@ -289,6 +303,7 @@ impl MvStore { /// Inserts a row in the database with new values, previously deleting /// any old data if it existed. Bails on a delete error, e.g. write-write conflict. pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + tracing::trace!("upsert(tx_id={}, row.id={:?})", tx_id, row.id); self.delete(tx_id, row.id)?; self.insert(tx_id, row) } @@ -308,6 +323,7 @@ impl MvStore { /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { + tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id); let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { let mut row_versions = row_versions.value().write().unwrap(); @@ -362,6 +378,7 @@ impl MvStore { /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { + tracing::trace!("read(tx_id={}, id={:?})", tx_id, id); let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value().read().unwrap(); assert_eq!(tx.state, TransactionState::Active); @@ -382,12 +399,14 @@ impl MvStore { /// Gets all row ids in the database. pub fn scan_row_ids(&self) -> Result> { + tracing::trace!("scan_row_ids"); let keys = self.rows.iter().map(|entry| *entry.key()); Ok(keys.collect()) } /// Gets all row ids in the database for a given table. pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + tracing::trace!("scan_row_ids_for_table(table_id={})", table_id); Ok(self .rows .range( @@ -412,7 +431,7 @@ impl MvStore { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); - tracing::trace!("BEGIN {tx}"); + tracing::trace!("begin_tx(tx_id={})", tx_id); self.txs.insert(tx_id, RwLock::new(tx)); tx_id } @@ -439,7 +458,7 @@ impl MvStore { } } tx.state.store(TransactionState::Preparing); - tracing::trace!("PREPARE {tx}"); + tracing::trace!("prepare_tx(tx_id={})", tx_id); /* TODO: The code we have here is sufficient for snapshot isolation. ** In order to implement serializability, we need the following steps: @@ -515,7 +534,7 @@ impl MvStore { """ */ tx.state.store(TransactionState::Committed(end_ts)); - tracing::trace!("COMMIT {tx}"); + tracing::trace!("commit_tx(tx_id={})", tx_id); let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); drop(tx); // Postprocessing: inserting row versions and logging the transaction to persistent storage. @@ -550,7 +569,7 @@ impl MvStore { } } } - tracing::trace!("UPDATED TX{tx_id}"); + tracing::trace!("updated(tx_id={})", tx_id); // We have now updated all the versions with a reference to the // transaction ID to a timestamp and can, therefore, remove the // transaction. Please note that when we move to lockless, the @@ -563,7 +582,7 @@ impl MvStore { if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; } - tracing::trace!("LOGGED {tx_id}"); + tracing::trace!("logged(tx_id={})", tx_id); Ok(()) } @@ -580,7 +599,7 @@ impl MvStore { let tx = tx_unlocked.value().write().unwrap(); assert_eq!(tx.state, TransactionState::Active); tx.state.store(TransactionState::Aborted); - tracing::trace!("ABORT {tx}"); + tracing::trace!("abort(tx_id={})", tx_id); let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); drop(tx); @@ -596,7 +615,7 @@ impl MvStore { let tx = tx_unlocked.value().read().unwrap(); tx.state.store(TransactionState::Terminated); - tracing::trace!("TERMINATE {tx}"); + tracing::trace!("terminate(tx_id={})", tx_id); // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? self.txs.remove(&tx_id); @@ -617,7 +636,7 @@ impl MvStore { /// Returns the number of removed versions. pub fn drop_unused_row_versions(&self) -> usize { tracing::trace!( - "Dropping unused row versions. Database stats: transactions: {}; rows: {}", + "drop_unused_row_versions() -> txs: {}; rows: {}", self.txs.len(), self.rows.len() ); @@ -673,7 +692,7 @@ impl MvStore { pub fn recover(&self) -> Result<()> { let tx_log = self.storage.read_tx_log()?; for record in tx_log { - tracing::debug!("RECOVERING {:?}", record); + tracing::debug!("recover() -> tx_timestamp={}", record.tx_timestamp); for version in record.row_versions { self.insert_version(version.row.id, version); } diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs index 70d36461f..a333255fa 100644 --- a/core/mvcc/mod.rs +++ b/core/mvcc/mod.rs @@ -37,6 +37,9 @@ pub mod database; pub mod errors; pub mod persistent_storage; +pub use clock::LocalClock; +pub use database::MvStore; + #[cfg(test)] mod tests { use crate::mvcc::clock::LocalClock; diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 924a6eb2c..69418e2a4 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -4,6 +4,7 @@ use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::{ read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell, }; +use crate::MvCursor; use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp}; use crate::{return_corrupt, LimboError, Result}; @@ -135,6 +136,9 @@ impl CursorState { } pub struct BTreeCursor { + /// The multi-version cursor that is used to read and write to the database file. + mv_cursor: Option>>, + /// The pager that is used to read and write to the database file. pager: Rc, /// Page id of the root page used to go back up fast. root_page: usize, @@ -178,8 +182,13 @@ struct CellArray { } impl BTreeCursor { - pub fn new(pager: Rc, root_page: usize) -> Self { + pub fn new( + mv_cursor: Option>>, + pager: Rc, + root_page: usize, + ) -> Self { Self { + mv_cursor, pager, root_page, rowid: Cell::new(None), @@ -198,6 +207,10 @@ impl BTreeCursor { /// Check if the table is empty. /// This is done by checking if the root page has no cells. fn is_empty_table(&self) -> Result> { + if let Some(mv_cursor) = &self.mv_cursor { + let mv_cursor = mv_cursor.borrow(); + return Ok(CursorResult::Ok(mv_cursor.is_empty())); + } let page = self.pager.read_page(self.root_page)?; return_if_locked!(page); @@ -290,6 +303,19 @@ impl BTreeCursor { &mut self, predicate: Option<(SeekKey<'_>, SeekOp)>, ) -> Result, Option)>> { + if let Some(mv_cursor) = &self.mv_cursor { + let mut mv_cursor = mv_cursor.borrow_mut(); + let rowid = mv_cursor.current_row_id(); + match rowid { + Some(rowid) => { + let record = mv_cursor.current_row().unwrap().unwrap(); + let record: Record = crate::storage::sqlite3_ondisk::read_record(&record.data)?; + mv_cursor.forward(); + return Ok(CursorResult::Ok((Some(rowid.row_id), Some(record)))); + } + None => return Ok(CursorResult::Ok((None, None))), + } + } loop { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_cell_index() as usize; @@ -592,6 +618,7 @@ impl BTreeCursor { } pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result> { + assert!(self.mv_cursor.is_none()); // For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers. // B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data. // @@ -1592,15 +1619,22 @@ impl BTreeCursor { } pub fn rewind(&mut self) -> Result> { - self.move_to_root(); + if let Some(_) = &self.mv_cursor { + let (rowid, record) = return_if_io!(self.get_next_record(None)); + self.rowid.replace(rowid); + self.record.replace(record); + } else { + self.move_to_root(); - let (rowid, record) = return_if_io!(self.get_next_record(None)); - self.rowid.replace(rowid); - self.record.replace(record); + let (rowid, record) = return_if_io!(self.get_next_record(None)); + self.rowid.replace(rowid); + self.record.replace(record); + } Ok(CursorResult::Ok(())) } pub fn last(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); match self.move_to_rightmost()? { CursorResult::Ok(_) => self.prev(), CursorResult::IO => Ok(CursorResult::IO), @@ -1615,6 +1649,7 @@ impl BTreeCursor { } pub fn prev(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); match self.get_prev_record()? { CursorResult::Ok((rowid, record)) => { self.rowid.replace(rowid); @@ -1631,10 +1666,15 @@ impl BTreeCursor { } pub fn rowid(&self) -> Result> { + if let Some(mv_cursor) = &self.mv_cursor { + let mv_cursor = mv_cursor.borrow(); + return Ok(mv_cursor.current_row_id().map(|rowid| rowid.row_id)); + } Ok(self.rowid.get()) } pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { + assert!(self.mv_cursor.is_none()); let (rowid, record) = return_if_io!(self.do_seek(key, op)); self.rowid.replace(rowid); self.record.replace(record); @@ -1648,23 +1688,35 @@ impl BTreeCursor { pub fn insert( &mut self, key: &OwnedValue, - _record: &Record, + record: &Record, moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */ ) -> Result> { let int_key = match key { OwnedValue::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), }; - if !moved_before { - return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); - } - - return_if_io!(self.insert_into_page(key, _record)); - self.rowid.replace(Some(*int_key as u64)); + match &self.mv_cursor { + Some(mv_cursor) => { + let row_id = + crate::mvcc::database::RowID::new(self.table_id() as u64, *int_key as u64); + let mut record_buf = Vec::new(); + record.serialize(&mut record_buf); + let row = crate::mvcc::database::Row::new(row_id, record_buf); + mv_cursor.borrow_mut().insert(row).unwrap(); + } + None => { + if !moved_before { + return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); + } + return_if_io!(self.insert_into_page(key, record)); + self.rowid.replace(Some(*int_key as u64)); + } + }; Ok(CursorResult::Ok(())) } pub fn delete(&mut self) -> Result> { + assert!(self.mv_cursor.is_none()); let page = self.stack.top(); return_if_locked!(page); @@ -1808,6 +1860,7 @@ impl BTreeCursor { } pub fn exists(&mut self, key: &OwnedValue) -> Result> { + assert!(self.mv_cursor.is_none()); let int_key = match key { OwnedValue::Integer(i) => i, _ => unreachable!("btree tables are indexed by integers!"), @@ -1927,6 +1980,10 @@ impl BTreeCursor { Ok(Some(n_overflow)) } + + pub fn table_id(&self) -> usize { + self.root_page + } } impl PageStack { @@ -2811,7 +2868,7 @@ mod tests { .unwrap(); } let io: Arc = Arc::new(PlatformIO::new().unwrap()); - let db = Database::open_file(io.clone(), path.to_str().unwrap()).unwrap(); + let db = Database::open_file(io.clone(), path.to_str().unwrap(), false).unwrap(); db } @@ -2905,7 +2962,7 @@ mod tests { } fn validate_btree(pager: Rc, page_idx: usize) -> (usize, bool) { - let cursor = BTreeCursor::new(pager.clone(), page_idx); + let cursor = BTreeCursor::new(None, pager.clone(), page_idx); let page = pager.read_page(page_idx).unwrap(); let page = page.get(); let contents = page.contents.as_ref().unwrap(); @@ -2969,7 +3026,7 @@ mod tests { } fn format_btree(pager: Rc, page_idx: usize, depth: usize) -> String { - let cursor = BTreeCursor::new(pager.clone(), page_idx); + let cursor = BTreeCursor::new(None, pager.clone(), page_idx); let page = pager.read_page(page_idx).unwrap(); let page = page.get(); let contents = page.contents.as_ref().unwrap(); @@ -3100,7 +3157,7 @@ mod tests { .as_slice(), ] { let (pager, root_page) = empty_btree(); - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); for (key, size) in sequence.iter() { run_until_done( || { @@ -3151,7 +3208,7 @@ mod tests { tracing::info!("super seed: {}", seed); for _ in 0..attempts { let (pager, root_page) = empty_btree(); - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); let mut keys = Vec::new(); let seed = rng.next_u64(); tracing::info!("seed: {}", seed); @@ -3332,7 +3389,7 @@ mod tests { #[ignore] pub fn test_clear_overflow_pages() -> Result<()> { let (pager, db_header) = setup_test_env(5); - let cursor = BTreeCursor::new(pager.clone(), 1); + let cursor = BTreeCursor::new(None, pager.clone(), 1); let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096); let usable_size = cursor.usable_space(); @@ -3430,7 +3487,7 @@ mod tests { #[test] pub fn test_clear_overflow_pages_no_overflow() -> Result<()> { let (pager, db_header) = setup_test_env(5); - let cursor = BTreeCursor::new(pager.clone(), 1); + let cursor = BTreeCursor::new(None, pager.clone(), 1); let small_payload = vec![b'A'; 10]; @@ -3869,7 +3926,7 @@ mod tests { let (pager, root_page) = empty_btree(); let mut keys = Vec::new(); for i in 0..10000 { - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); tracing::info!("INSERT INTO t VALUES ({});", i,); let key = OwnedValue::Integer(i); let value = Record::new(vec![OwnedValue::Integer(i)]); @@ -3893,7 +3950,7 @@ mod tests { format_btree(pager.clone(), root_page, 0) ); for key in keys.iter() { - let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let mut cursor = BTreeCursor::new(None, pager.clone(), root_page); let key = OwnedValue::Integer(*key); let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap(); assert!(exists, "key not found {}", key); diff --git a/core/util.rs b/core/util.rs index b16c29de3..3e141f9a9 100644 --- a/core/util.rs +++ b/core/util.rs @@ -41,8 +41,10 @@ pub fn parse_schema_rows( schema: &mut Schema, io: Arc, syms: &SymbolTable, + mv_tx_id: Option, ) -> Result<()> { if let Some(mut rows) = rows { + rows.set_mv_tx_id(mv_tx_id); let mut automatic_indexes = Vec::new(); loop { match rows.step()? { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 3f16d525c..bf74a4d3a 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -55,7 +55,9 @@ use crate::{ json::json_quote, json::json_remove, json::json_set, json::json_type, }; use crate::{info, CheckpointStatus}; -use crate::{resolve_ext_path, Connection, Result, TransactionState, DATABASE_VERSION}; +use crate::{ + resolve_ext_path, Connection, MvCursor, MvStore, Result, TransactionState, DATABASE_VERSION, +}; use insn::{ exec_add, exec_and, exec_bit_and, exec_bit_not, exec_bit_or, exec_boolean_not, exec_concat, exec_divide, exec_multiply, exec_or, exec_remainder, exec_shift_left, exec_shift_right, @@ -235,6 +237,7 @@ pub struct ProgramState { deferred_seek: Option<(CursorID, CursorID)>, ended_coroutine: Bitfield<4>, // flag to indicate that a coroutine has ended (key is the yield register. currently we assume that the yield register is always between 0-255, YOLO) regex_cache: RegexCache, + pub(crate) mv_tx_id: Option, interrupted: bool, parameters: HashMap, OwnedValue>, halt_state: Option, @@ -254,6 +257,7 @@ impl ProgramState { deferred_seek: None, ended_coroutine: Bitfield::new(), regex_cache: RegexCache::new(), + mv_tx_id: None, interrupted: false, parameters: HashMap::new(), halt_state: None, @@ -358,7 +362,12 @@ impl Program { } #[instrument(skip_all)] - pub fn step(&self, state: &mut ProgramState, pager: Rc) -> Result { + pub fn step( + &self, + state: &mut ProgramState, + mv_store: Option>, + pager: Rc, + ) -> Result { loop { if state.is_interrupted() { return Ok(StepResult::Interrupt); @@ -755,7 +764,18 @@ impl Program { root_page, } => { let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap(); - let cursor = BTreeCursor::new(pager.clone(), *root_page); + let mv_cursor = match state.mv_tx_id { + Some(tx_id) => { + let table_id = *root_page as u64; + let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_cursor = Rc::new(RefCell::new( + MvCursor::new(mv_store, tx_id, table_id).unwrap(), + )); + Some(mv_cursor) + } + None => None, + }; + let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page); let mut cursors = state.cursors.borrow_mut(); match cursor_type { CursorType::BTreeTable(_) => { @@ -1204,36 +1224,49 @@ impl Program { ))); } } - return self.halt(pager, state); + return self.halt(pager, state, mv_store); } Insn::Transaction { write } => { - let connection = self.connection.upgrade().unwrap(); - let current_state = connection.transaction_state.borrow().clone(); - let (new_transaction_state, updated) = match (¤t_state, write) { - (TransactionState::Write, true) => (TransactionState::Write, false), - (TransactionState::Write, false) => (TransactionState::Write, false), - (TransactionState::Read, true) => (TransactionState::Write, true), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => (TransactionState::Write, true), - (TransactionState::None, false) => (TransactionState::Read, true), - }; - - if updated && matches!(current_state, TransactionState::None) { - if let LimboResult::Busy = pager.begin_read_tx()? { - return Ok(StepResult::Busy); + if let Some(mv_store) = &mv_store { + if state.mv_tx_id.is_none() { + let tx_id = mv_store.begin_tx(); + self.connection + .upgrade() + .unwrap() + .mv_transactions + .borrow_mut() + .push(tx_id); + state.mv_tx_id = Some(tx_id); } - } + } else { + let connection = self.connection.upgrade().unwrap(); + let current_state = connection.transaction_state.borrow().clone(); + let (new_transaction_state, updated) = match (¤t_state, write) { + (TransactionState::Write, true) => (TransactionState::Write, false), + (TransactionState::Write, false) => (TransactionState::Write, false), + (TransactionState::Read, true) => (TransactionState::Write, true), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => (TransactionState::Write, true), + (TransactionState::None, false) => (TransactionState::Read, true), + }; - if updated && matches!(new_transaction_state, TransactionState::Write) { - if let LimboResult::Busy = pager.begin_write_tx()? { - tracing::trace!("begin_write_tx busy"); - return Ok(StepResult::Busy); + if updated && matches!(current_state, TransactionState::None) { + if let LimboResult::Busy = pager.begin_read_tx()? { + return Ok(StepResult::Busy); + } + } + + if updated && matches!(new_transaction_state, TransactionState::Write) { + if let LimboResult::Busy = pager.begin_write_tx()? { + tracing::trace!("begin_write_tx busy"); + return Ok(StepResult::Busy); + } + } + if updated { + connection + .transaction_state + .replace(new_transaction_state.clone()); } - } - if updated { - connection - .transaction_state - .replace(new_transaction_state.clone()); } state.pc += 1; } @@ -1261,7 +1294,7 @@ impl Program { "cannot commit - no transaction is active".to_string(), )); } - return self.halt(pager, state); + return self.halt(pager, state, mv_store); } Insn::Goto { target_pc } => { assert!(target_pc.is_offset()); @@ -2935,7 +2968,18 @@ impl Program { let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap(); let mut cursors = state.cursors.borrow_mut(); let is_index = cursor_type.is_index(); - let cursor = BTreeCursor::new(pager.clone(), *root_page); + let mv_cursor = match state.mv_tx_id { + Some(tx_id) => { + let table_id = *root_page as u64; + let mv_store = mv_store.as_ref().unwrap().clone(); + let mv_cursor = Rc::new(RefCell::new( + MvCursor::new(mv_store, tx_id, table_id).unwrap(), + )); + Some(mv_cursor) + } + None => None, + }; + let cursor = BTreeCursor::new(mv_cursor, pager.clone(), *root_page); if is_index { cursors .get_mut(*cursor_id) @@ -3014,6 +3058,7 @@ impl Program { &mut schema, conn.pager.io.clone(), &conn.syms.borrow(), + state.mv_tx_id, )?; state.pc += 1; } @@ -3084,41 +3129,59 @@ impl Program { } } - fn halt(&self, pager: Rc, program_state: &mut ProgramState) -> Result { - let connection = self - .connection - .upgrade() - .expect("only weak ref to connection?"); - let auto_commit = *connection.auto_commit.borrow(); - tracing::trace!("Halt auto_commit {}", auto_commit); - assert!( - program_state.halt_state.is_none() - || (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing)) - ); - if program_state.halt_state.is_some() { - self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref()) - } else { + fn halt( + &self, + pager: Rc, + program_state: &mut ProgramState, + mv_store: Option>, + ) -> Result { + if let Some(mv_store) = mv_store { + let conn = self.connection.upgrade().unwrap(); + let auto_commit = *conn.auto_commit.borrow(); if auto_commit { - let current_state = connection.transaction_state.borrow().clone(); - match current_state { - TransactionState::Write => self.step_end_write_txn( - &pager, - &mut program_state.halt_state, - connection.deref(), - ), - TransactionState::Read => { - pager.end_read_tx()?; - Ok(StepResult::Done) - } - TransactionState::None => Ok(StepResult::Done), + let mut mv_transactions = conn.mv_transactions.borrow_mut(); + for tx_id in mv_transactions.iter() { + mv_store.commit_tx(*tx_id).unwrap(); } + mv_transactions.clear(); + } + return Ok(StepResult::Done); + } else { + let connection = self + .connection + .upgrade() + .expect("only weak ref to connection?"); + let auto_commit = *connection.auto_commit.borrow(); + tracing::trace!("Halt auto_commit {}", auto_commit); + assert!( + program_state.halt_state.is_none() + || (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing)) + ); + if program_state.halt_state.is_some() { + self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref()) } else { - if self.change_cnt_on { - if let Some(conn) = self.connection.upgrade() { - conn.set_changes(self.n_change.get()); + if auto_commit { + let current_state = connection.transaction_state.borrow().clone(); + match current_state { + TransactionState::Write => self.step_end_write_txn( + &pager, + &mut program_state.halt_state, + connection.deref(), + ), + TransactionState::Read => { + pager.end_read_tx()?; + Ok(StepResult::Done) + } + TransactionState::None => Ok(StepResult::Done), } + } else { + if self.change_cnt_on { + if let Some(conn) = self.connection.upgrade() { + conn.set_changes(self.n_change.get()); + } + } + Ok(StepResult::Done) } - Ok(StepResult::Done) } } } diff --git a/simulator/main.rs b/simulator/main.rs index a2f07d95e..d28c2b017 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -308,8 +308,12 @@ fn doublecheck( ) { { let mut env_ = env.lock().unwrap(); - env_.db = - Database::open_file(env_.io.clone(), paths.doublecheck_db.to_str().unwrap()).unwrap(); + env_.db = Database::open_file( + env_.io.clone(), + paths.doublecheck_db.to_str().unwrap(), + false, + ) + .unwrap(); } // Run the simulation again diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 1b7240b9b..a9409ad7e 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -87,7 +87,7 @@ impl SimulatorEnv { std::fs::remove_file(db_path).unwrap(); } - let db = match Database::open_file(io.clone(), db_path.to_str().unwrap()) { + let db = match Database::open_file(io.clone(), db_path.to_str().unwrap(), false) { Ok(db) => db, Err(e) => { panic!("error opening simulator test file {:?}: {:?}", db_path, e); diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 43d1ff0fc..c1cab5eb0 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -116,7 +116,7 @@ pub unsafe extern "C" fn sqlite3_open( Err(_) => return SQLITE_MISUSE, }, }; - match limbo_core::Database::open_file(io, filename) { + match limbo_core::Database::open_file(io, filename, false) { Ok(db) => { let conn = db.connect().unwrap(); *db_out = Box::leak(Box::new(sqlite3::new(db, conn))); diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 1831abbd0..0f0d550b9 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -42,7 +42,7 @@ impl TempDatabase { pub fn connect_limbo(&self) -> Rc { log::debug!("conneting to limbo"); - let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap(); + let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap(), false).unwrap(); let conn = db.connect().unwrap(); log::debug!("connected to limbo"); @@ -51,7 +51,7 @@ impl TempDatabase { pub fn limbo_database(&self) -> Arc { log::debug!("conneting to limbo"); - Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap() + Database::open_file(self.io.clone(), self.path.to_str().unwrap(), false).unwrap() } }