mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-31 13:54:27 +01:00
core/mvcc: introduce MvccMode Logical Log
This commit is contained in:
@@ -40,6 +40,7 @@ pub mod value;
|
||||
use transaction::TransactionBehavior;
|
||||
#[cfg(feature = "conn_raw_api")]
|
||||
use turso_core::types::WalFrameInfo;
|
||||
use turso_core::MvccMode;
|
||||
pub use value::Value;
|
||||
|
||||
pub use params::params_from_iter;
|
||||
@@ -82,6 +83,7 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
pub struct Builder {
|
||||
path: String,
|
||||
enable_mvcc: bool,
|
||||
mvcc_mode: MvccMode,
|
||||
vfs: Option<String>,
|
||||
}
|
||||
|
||||
@@ -91,12 +93,14 @@ impl Builder {
|
||||
Self {
|
||||
path: path.to_string(),
|
||||
enable_mvcc: false,
|
||||
mvcc_mode: MvccMode::Noop,
|
||||
vfs: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_mvcc(mut self, mvcc_enabled: bool) -> Self {
|
||||
pub fn with_mvcc(mut self, mvcc_enabled: bool, mvcc_mode: MvccMode) -> Self {
|
||||
self.enable_mvcc = mvcc_enabled;
|
||||
self.mvcc_mode = mvcc_mode;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -109,7 +113,13 @@ impl Builder {
|
||||
#[allow(unused_variables, clippy::arc_with_non_send_sync)]
|
||||
pub async fn build(self) -> Result<Database> {
|
||||
let io = self.get_io()?;
|
||||
let db = turso_core::Database::open_file(io, self.path.as_str(), self.enable_mvcc, true)?;
|
||||
let db = turso_core::Database::open_file(
|
||||
io,
|
||||
self.path.as_str(),
|
||||
self.enable_mvcc,
|
||||
true,
|
||||
self.mvcc_mode,
|
||||
)?;
|
||||
Ok(Database { inner: db })
|
||||
}
|
||||
|
||||
|
||||
10
cli/app.rs
10
cli/app.rs
@@ -375,7 +375,7 @@ impl Limbo {
|
||||
};
|
||||
(
|
||||
io.clone(),
|
||||
Database::open_file(io.clone(), path, false, false)?,
|
||||
Database::open_file(io.clone(), path, false, false, turso_core::MvccMode::Noop)?,
|
||||
)
|
||||
};
|
||||
self.io = io;
|
||||
@@ -1656,7 +1656,13 @@ impl Limbo {
|
||||
anyhow::bail!("Refusing to overwrite existing file: {output_file}");
|
||||
}
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new()?);
|
||||
let db = Database::open_file(io.clone(), output_file, false, true)?;
|
||||
let db = Database::open_file(
|
||||
io.clone(),
|
||||
output_file,
|
||||
false,
|
||||
true,
|
||||
turso_core::MvccMode::Noop,
|
||||
)?;
|
||||
let target = db.connect()?;
|
||||
|
||||
let mut applier = ApplyWriter::new(&target);
|
||||
|
||||
@@ -155,7 +155,7 @@ impl Database {
|
||||
}
|
||||
},
|
||||
};
|
||||
let db = Self::open_file(io.clone(), path, false, false)?;
|
||||
let db = Self::open_file(io.clone(), path, false, false, crate::MvccMode::Noop)?;
|
||||
Ok((io, db))
|
||||
}
|
||||
|
||||
|
||||
@@ -1115,7 +1115,7 @@ impl DbspCompiler {
|
||||
|
||||
// Create an internal connection for expression compilation
|
||||
let io = Arc::new(MemoryIO::new());
|
||||
let db = Database::open_file(io, ":memory:", false, false)?;
|
||||
let db = Database::open_file(io, ":memory:", false, false, crate::MvccMode::Noop)?;
|
||||
let internal_conn = db.connect()?;
|
||||
internal_conn.query_only.set(true);
|
||||
internal_conn.auto_commit.set(false);
|
||||
|
||||
@@ -1018,8 +1018,11 @@ impl ProjectOperator {
|
||||
// Set up internal connection for expression evaluation
|
||||
let io = Arc::new(crate::MemoryIO::new());
|
||||
let db = Database::open_file(
|
||||
io, ":memory:", false, // no MVCC needed for expression evaluation
|
||||
io,
|
||||
":memory:",
|
||||
false, // no MVCC needed for expression evaluation
|
||||
false, // no indexes needed
|
||||
crate::MvccMode::Noop,
|
||||
)?;
|
||||
let internal_conn = db.connect()?;
|
||||
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
|
||||
@@ -1132,8 +1135,11 @@ impl ProjectOperator {
|
||||
// Set up internal connection for expression evaluation
|
||||
let io = Arc::new(crate::MemoryIO::new());
|
||||
let db = Database::open_file(
|
||||
io, ":memory:", false, // no MVCC needed for expression evaluation
|
||||
io,
|
||||
":memory:",
|
||||
false, // no MVCC needed for expression evaluation
|
||||
false, // no indexes needed
|
||||
crate::MvccMode::Noop,
|
||||
)?;
|
||||
let internal_conn = db.connect()?;
|
||||
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
|
||||
|
||||
26
core/lib.rs
26
core/lib.rs
@@ -99,10 +99,17 @@ use util::parse_schema_rows;
|
||||
pub use util::IOExt;
|
||||
pub use vdbe::{builder::QueryMode, explain::EXPLAIN_COLUMNS, explain::EXPLAIN_QUERY_PLAN_COLUMNS};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum MvccMode {
|
||||
Noop,
|
||||
LogicalLog,
|
||||
}
|
||||
|
||||
/// Configuration for database features
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct DatabaseOpts {
|
||||
pub enable_mvcc: bool,
|
||||
pub mvcc_mode: MvccMode,
|
||||
pub enable_indexes: bool,
|
||||
pub enable_views: bool,
|
||||
pub enable_strict: bool,
|
||||
@@ -112,6 +119,7 @@ impl Default for DatabaseOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable_mvcc: false,
|
||||
mvcc_mode: MvccMode::Noop,
|
||||
enable_indexes: true,
|
||||
enable_views: false,
|
||||
enable_strict: false,
|
||||
@@ -129,6 +137,11 @@ impl DatabaseOpts {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_mvcc_mode(mut self, mvcc_mode: MvccMode) -> Self {
|
||||
self.mvcc_mode = mvcc_mode;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_indexes(mut self, enable: bool) -> Self {
|
||||
self.enable_indexes = enable;
|
||||
self
|
||||
@@ -267,6 +280,7 @@ impl Database {
|
||||
path: &str,
|
||||
enable_mvcc: bool,
|
||||
enable_indexes: bool,
|
||||
mvcc_mode: MvccMode,
|
||||
) -> Result<Arc<Database>> {
|
||||
Self::open_file_with_flags(
|
||||
io,
|
||||
@@ -274,6 +288,7 @@ impl Database {
|
||||
OpenFlags::default(),
|
||||
DatabaseOpts::new()
|
||||
.with_mvcc(enable_mvcc)
|
||||
.with_mvcc_mode(mvcc_mode)
|
||||
.with_indexes(enable_indexes),
|
||||
None,
|
||||
)
|
||||
@@ -393,10 +408,13 @@ impl Database {
|
||||
let shared_wal = WalFileShared::open_shared_if_exists(&io, wal_path)?;
|
||||
|
||||
let mv_store = if opts.enable_mvcc {
|
||||
Some(Arc::new(MvStore::new(
|
||||
mvcc::LocalClock::new(),
|
||||
mvcc::persistent_storage::Storage::new_noop(),
|
||||
)))
|
||||
let storage = match opts.mvcc_mode {
|
||||
MvccMode::Noop => mvcc::persistent_storage::Storage::new_noop(),
|
||||
MvccMode::LogicalLog => mvcc::persistent_storage::Storage::new_logical_log(
|
||||
io.open_file(&format!("{path}-lg"), OpenFlags::default(), false)?,
|
||||
),
|
||||
};
|
||||
Some(Arc::new(MvStore::new(mvcc::LocalClock::new(), storage)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -1,23 +1,31 @@
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::mvcc::database::LogRecord;
|
||||
use crate::{LimboError, Result};
|
||||
use crate::{File, LimboError, Result};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Storage {
|
||||
Noop,
|
||||
LogicalLog { file: Arc<dyn File> },
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn new_noop() -> Self {
|
||||
Self::Noop
|
||||
}
|
||||
|
||||
pub fn new_logical_log(file: Arc<dyn File>) -> Self {
|
||||
Self::LogicalLog { file }
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn log_tx(&self, _m: LogRecord) -> Result<()> {
|
||||
match self {
|
||||
Self::Noop => (),
|
||||
Self::LogicalLog { file } => {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -27,6 +35,16 @@ impl Storage {
|
||||
Self::Noop => Err(LimboError::InternalError(
|
||||
"cannot read from Noop storage".to_string(),
|
||||
)),
|
||||
Self::LogicalLog { file } => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Storage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Noop => write!(f, "Noop"),
|
||||
Self::LogicalLog { file: _ } => write!(f, "LogicalLog {{ file }}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +130,10 @@ async fn setup_database(db_path: &str, mode: TransactionMode) -> Result<Database
|
||||
let db = match mode {
|
||||
TransactionMode::Legacy => builder.build().await?,
|
||||
TransactionMode::Mvcc | TransactionMode::Concurrent => {
|
||||
builder.with_mvcc(true).build().await?
|
||||
builder
|
||||
.with_mvcc(true, turso::MvccMode::LogicalLog)
|
||||
.build()
|
||||
.await?
|
||||
}
|
||||
};
|
||||
let conn = db.connect()?;
|
||||
|
||||
@@ -594,7 +594,7 @@ async fn multiple_connections_fuzz(opts: FuzzOptions) {
|
||||
// Create a fresh database for each iteration
|
||||
let tempfile = tempfile::NamedTempFile::new().unwrap();
|
||||
let db = Builder::new_local(tempfile.path().to_str().unwrap())
|
||||
.with_mvcc(opts.mvcc_enabled)
|
||||
.with_mvcc(opts.mvcc_enabled, turso::MvccMode::Noop)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user