diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 39706fdd5..6887d418e 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -40,6 +40,7 @@ pub mod value; use transaction::TransactionBehavior; #[cfg(feature = "conn_raw_api")] use turso_core::types::WalFrameInfo; +use turso_core::MvccMode; pub use value::Value; pub use params::params_from_iter; @@ -82,6 +83,7 @@ pub type Result = std::result::Result; pub struct Builder { path: String, enable_mvcc: bool, + mvcc_mode: MvccMode, vfs: Option, } @@ -91,12 +93,14 @@ impl Builder { Self { path: path.to_string(), enable_mvcc: false, + mvcc_mode: MvccMode::Noop, vfs: None, } } - pub fn with_mvcc(mut self, mvcc_enabled: bool) -> Self { + pub fn with_mvcc(mut self, mvcc_enabled: bool, mvcc_mode: MvccMode) -> Self { self.enable_mvcc = mvcc_enabled; + self.mvcc_mode = mvcc_mode; self } @@ -109,7 +113,13 @@ impl Builder { #[allow(unused_variables, clippy::arc_with_non_send_sync)] pub async fn build(self) -> Result { let io = self.get_io()?; - let db = turso_core::Database::open_file(io, self.path.as_str(), self.enable_mvcc, true)?; + let db = turso_core::Database::open_file( + io, + self.path.as_str(), + self.enable_mvcc, + true, + self.mvcc_mode, + )?; Ok(Database { inner: db }) } diff --git a/cli/app.rs b/cli/app.rs index 30b5d6ec6..2565bbe8e 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -375,7 +375,7 @@ impl Limbo { }; ( io.clone(), - Database::open_file(io.clone(), path, false, false)?, + Database::open_file(io.clone(), path, false, false, turso_core::MvccMode::Noop)?, ) }; self.io = io; @@ -1656,7 +1656,13 @@ impl Limbo { anyhow::bail!("Refusing to overwrite existing file: {output_file}"); } let io: Arc = Arc::new(turso_core::PlatformIO::new()?); - let db = Database::open_file(io.clone(), output_file, false, true)?; + let db = Database::open_file( + io.clone(), + output_file, + false, + true, + turso_core::MvccMode::Noop, + )?; let target = db.connect()?; let mut applier = ApplyWriter::new(&target); diff --git a/core/ext/mod.rs b/core/ext/mod.rs index 1d73c3ba2..e6821f4b5 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -155,7 +155,7 @@ impl Database { } }, }; - let db = Self::open_file(io.clone(), path, false, false)?; + let db = Self::open_file(io.clone(), path, false, false, crate::MvccMode::Noop)?; Ok((io, db)) } diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 972d6797b..fdb8e210e 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -1115,7 +1115,7 @@ impl DbspCompiler { // Create an internal connection for expression compilation let io = Arc::new(MemoryIO::new()); - let db = Database::open_file(io, ":memory:", false, false)?; + let db = Database::open_file(io, ":memory:", false, false, crate::MvccMode::Noop)?; let internal_conn = db.connect()?; internal_conn.query_only.set(true); internal_conn.auto_commit.set(false); diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 677d26aa4..e61e36983 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -1018,8 +1018,11 @@ impl ProjectOperator { // Set up internal connection for expression evaluation let io = Arc::new(crate::MemoryIO::new()); let db = Database::open_file( - io, ":memory:", false, // no MVCC needed for expression evaluation + io, + ":memory:", + false, // no MVCC needed for expression evaluation false, // no indexes needed + crate::MvccMode::Noop, )?; let internal_conn = db.connect()?; // Set to read-only mode and disable auto-commit since we're only evaluating expressions @@ -1132,8 +1135,11 @@ impl ProjectOperator { // Set up internal connection for expression evaluation let io = Arc::new(crate::MemoryIO::new()); let db = Database::open_file( - io, ":memory:", false, // no MVCC needed for expression evaluation + io, + ":memory:", + false, // no MVCC needed for expression evaluation false, // no indexes needed + crate::MvccMode::Noop, )?; let internal_conn = db.connect()?; // Set to read-only mode and disable auto-commit since we're only evaluating expressions diff --git a/core/lib.rs b/core/lib.rs index ab9c768d3..7ee46c078 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -99,10 +99,17 @@ use util::parse_schema_rows; pub use util::IOExt; pub use vdbe::{builder::QueryMode, explain::EXPLAIN_COLUMNS, explain::EXPLAIN_QUERY_PLAN_COLUMNS}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MvccMode { + Noop, + LogicalLog, +} + /// Configuration for database features #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct DatabaseOpts { pub enable_mvcc: bool, + pub mvcc_mode: MvccMode, pub enable_indexes: bool, pub enable_views: bool, pub enable_strict: bool, @@ -112,6 +119,7 @@ impl Default for DatabaseOpts { fn default() -> Self { Self { enable_mvcc: false, + mvcc_mode: MvccMode::Noop, enable_indexes: true, enable_views: false, enable_strict: false, @@ -129,6 +137,11 @@ impl DatabaseOpts { self } + pub fn with_mvcc_mode(mut self, mvcc_mode: MvccMode) -> Self { + self.mvcc_mode = mvcc_mode; + self + } + pub fn with_indexes(mut self, enable: bool) -> Self { self.enable_indexes = enable; self @@ -267,6 +280,7 @@ impl Database { path: &str, enable_mvcc: bool, enable_indexes: bool, + mvcc_mode: MvccMode, ) -> Result> { Self::open_file_with_flags( io, @@ -274,6 +288,7 @@ impl Database { OpenFlags::default(), DatabaseOpts::new() .with_mvcc(enable_mvcc) + .with_mvcc_mode(mvcc_mode) .with_indexes(enable_indexes), None, ) @@ -393,10 +408,13 @@ impl Database { let shared_wal = WalFileShared::open_shared_if_exists(&io, wal_path)?; let mv_store = if opts.enable_mvcc { - Some(Arc::new(MvStore::new( - mvcc::LocalClock::new(), - mvcc::persistent_storage::Storage::new_noop(), - ))) + let storage = match opts.mvcc_mode { + MvccMode::Noop => mvcc::persistent_storage::Storage::new_noop(), + MvccMode::LogicalLog => mvcc::persistent_storage::Storage::new_logical_log( + io.open_file(&format!("{path}-lg"), OpenFlags::default(), false)?, + ), + }; + Some(Arc::new(MvStore::new(mvcc::LocalClock::new(), storage))) } else { None }; diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index 3dbd891a0..19ef799c4 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -1,23 +1,31 @@ use std::fmt::Debug; +use std::sync::Arc; use crate::mvcc::database::LogRecord; -use crate::{LimboError, Result}; +use crate::{File, LimboError, Result}; -#[derive(Debug)] pub enum Storage { Noop, + LogicalLog { file: Arc }, } impl Storage { pub fn new_noop() -> Self { Self::Noop } + + pub fn new_logical_log(file: Arc) -> Self { + Self::LogicalLog { file } + } } impl Storage { pub fn log_tx(&self, _m: LogRecord) -> Result<()> { match self { Self::Noop => (), + Self::LogicalLog { file } => { + todo!() + } } Ok(()) } @@ -27,6 +35,16 @@ impl Storage { Self::Noop => Err(LimboError::InternalError( "cannot read from Noop storage".to_string(), )), + Self::LogicalLog { file } => todo!(), + } + } +} + +impl Debug for Storage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Noop => write!(f, "Noop"), + Self::LogicalLog { file: _ } => write!(f, "LogicalLog {{ file }}"), } } } diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 61bd35ed0..4b3a2dc97 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -130,7 +130,10 @@ async fn setup_database(db_path: &str, mode: TransactionMode) -> Result builder.build().await?, TransactionMode::Mvcc | TransactionMode::Concurrent => { - builder.with_mvcc(true).build().await? + builder + .with_mvcc(true, turso::MvccMode::LogicalLog) + .build() + .await? } }; let conn = db.connect()?; diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index b8667da2a..6c94fd559 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -594,7 +594,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) { // Create a fresh database for each iteration let tempfile = tempfile::NamedTempFile::new().unwrap(); let db = Builder::new_local(tempfile.path().to_str().unwrap()) - .with_mvcc(opts.mvcc_enabled) + .with_mvcc(opts.mvcc_enabled, turso::MvccMode::Noop) .build() .await .unwrap();