diff --git a/core/benches/benchmark.rs b/core/benches/benchmark.rs index 5db976086..51b844aad 100644 --- a/core/benches/benchmark.rs +++ b/core/benches/benchmark.rs @@ -2,7 +2,7 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criteri use pprof::criterion::{Output, PProfProfiler}; use regex::Regex; use std::{sync::Arc, time::Instant}; -use turso_core::{Database, PlatformIO}; +use turso_core::{Database, LimboError, PlatformIO, StepResult}; #[cfg(not(target_family = "wasm"))] #[global_allocator] @@ -631,9 +631,274 @@ fn bench_insert_rows(criterion: &mut Criterion) { group.finish(); } +#[inline(never)] +fn bench_limbo( + mvcc: bool, + num_connections: i64, + num_batch_inserts: i64, + num_inserts_per_batch: usize, +) { + struct ConnectionState { + conn: Arc, + inserts: Vec, + current_statement: Option, + } + #[allow(clippy::arc_with_non_send_sync)] + let io = Arc::new(PlatformIO::new().unwrap()); + let temp_dir = tempfile::tempdir().unwrap(); + let path = temp_dir.path().join("bench.db"); + let db = Database::open_file(io.clone(), path.to_str().unwrap(), mvcc, false).unwrap(); + let mut connecitons = Vec::new(); + { + let conn = db.connect().unwrap(); + conn.execute("CREATE TABLE test (x)").unwrap(); + conn.close().unwrap(); + } + let inserts = + generate_inserts_per_connection(num_connections, num_batch_inserts, num_inserts_per_batch); + for i in 0..num_connections { + let conn = db.connect().unwrap(); + let inserts = inserts[i as usize].clone(); + connecitons.push(ConnectionState { + conn, + inserts, + current_statement: None, + }); + } + loop { + let mut all_finished = true; + for conn in &mut connecitons { + if !conn.inserts.is_empty() || conn.current_statement.is_some() { + all_finished = false; + break; + } + } + for conn in connecitons.iter_mut() { + if conn.current_statement.is_none() && !conn.inserts.is_empty() { + let write = conn.inserts.pop().unwrap(); + conn.current_statement = Some(conn.conn.prepare(&write).unwrap()); + } + if conn.current_statement.is_none() { + continue; + } + let stmt = conn.current_statement.as_mut().unwrap(); + match stmt.step().unwrap() { + // These you be only possible cases in write concurrency. + // No rows because insert doesn't return + // No interrupt because insert doesn't interrupt + // No busy because insert in mvcc should be multi concurrent write + StepResult::Done => { + conn.current_statement = None; + } + StepResult::IO => { + // let's skip doing I/O here, we want to perform io only after all the statements are stepped + } + StepResult::Busy => { + // We need to restart statement + if mvcc { + unreachable!(); + } + stmt.reset(); + } + _ => { + unreachable!() + } + } + } + db.io.step().unwrap(); + + if all_finished { + break; + } + } +} + +#[inline(never)] +fn bench_limbo_mvcc( + mvcc: bool, + num_connections: i64, + num_batch_inserts: i64, + num_inserts_per_batch: usize, +) { + struct ConnectionState { + conn: Arc, + inserts: Vec, + current_statement: Option, + current_insert: Option, + } + #[allow(clippy::arc_with_non_send_sync)] + let io = Arc::new(PlatformIO::new().unwrap()); + let temp_dir = tempfile::tempdir().unwrap(); + let path = temp_dir.path().join("bench.db"); + let db = Database::open_file(io.clone(), path.to_str().unwrap(), mvcc, false).unwrap(); + let mut connecitons = Vec::new(); + let conn0 = db.connect().unwrap(); + conn0.execute("CREATE TABLE test (x)").unwrap(); + + let inserts = + generate_inserts_per_connection(num_connections, num_batch_inserts, num_inserts_per_batch); + for i in 0..num_connections { + let conn = db.connect().unwrap(); + let inserts = inserts[i as usize].clone(); + connecitons.push(ConnectionState { + conn, + inserts, + current_statement: None, + current_insert: None, + }); + } + loop { + let mut all_finished = true; + for conn in &mut connecitons { + if !conn.inserts.is_empty() || conn.current_statement.is_some() { + all_finished = false; + break; + } + } + for conn in connecitons.iter_mut() { + if conn.current_statement.is_none() && !conn.inserts.is_empty() { + let write = conn.inserts.pop().unwrap(); + conn.conn.execute("BEGIN CONCURRENT").unwrap(); + conn.current_statement = Some(conn.conn.prepare(&write).unwrap()); + conn.current_insert = Some(write); + } + if conn.current_statement.is_none() { + continue; + } + let stmt = conn.current_statement.as_mut().unwrap(); + match stmt.step() { + // These you be only possible cases in write concurrency. + // No rows because insert doesn't return + // No interrupt because insert doesn't interrupt + // No busy because insert in mvcc should be multi concurrent write + Ok(StepResult::Done) => { + // Now do commit + conn.current_statement = Some(conn.conn.prepare("COMMIT").unwrap()); + } + Ok(StepResult::IO) => { + stmt.step().unwrap(); + // let's skip doing I/O here, we want to perform io only after all the statements are stepped + } + Ok(StepResult::Busy) => { + // We need to restart statement + if mvcc { + unreachable!(); + } + println!("resetting statement"); + stmt.reset(); + } + Err(err) => { + if let LimboError::SchemaUpdated = err { + conn.current_statement = Some( + conn.conn + .prepare(conn.current_insert.clone().as_ref().unwrap()) + .unwrap(), + ); + continue; + } + panic!("unexpected error: {err:?}"); + } + _ => { + unreachable!() + } + } + } + db.io.step().unwrap(); + + if all_finished { + break; + } + } +} + +fn generate_inserts_per_connection( + num_connections: i64, + num_batch_inserts: i64, + num_inserts_per_batch: usize, +) -> Vec> { + let mut inserts = vec![]; + for i in 0..num_connections { + let mut inserts_per_connection = vec![]; + for j in 0..num_batch_inserts { + inserts_per_connection.push(generate_batch_insert( + num_batch_inserts * (i + j), + num_inserts_per_batch, + )); + } + inserts.push(inserts_per_connection); + } + inserts +} + +fn generate_batch_insert(start: i64, num: usize) -> String { + let mut inserts = String::from("INSERT INTO test (x) VALUES "); + for i in 0..num { + inserts.push_str(&format!("({})", start + i as i64)); + if i < num - 1 { + inserts.push(','); + } + } + inserts +} + +fn bench_concurrent_writes(criterion: &mut Criterion) { + let mut group = criterion.benchmark_group("Concurrent writes"); + + let num_connections = 10; + let num_batch_inserts = 100; + let num_inserts_per_batch = 200_usize; + + group.bench_function("limbo_wal_concurrent_writes", |b| { + b.iter(|| { + bench_limbo( + false, + num_connections, + num_batch_inserts, + num_inserts_per_batch, + ); + }); + }); + group.bench_function("limbo_mvcc_concurrent_writes", |b| { + b.iter(|| { + bench_limbo_mvcc( + true, + num_connections, + num_batch_inserts, + num_inserts_per_batch, + ); + }); + }); + group.bench_function("sqlite_concurrent_writes", |b| { + let inserts = generate_inserts_per_connection( + num_connections, + num_batch_inserts, + num_inserts_per_batch, + ); + b.iter(|| { + let temp_dir = tempfile::tempdir().unwrap(); + let path = temp_dir.path().join("bench.db"); + { + let conn = rusqlite::Connection::open(path.to_str().unwrap()).unwrap(); + conn.pragma_update(None, "synchronous", "FULL").unwrap(); + conn.pragma_update(None, "journal_mode", "WAL").unwrap(); + conn.pragma_update(None, "locking_mode", "EXCLUSIVE") + .unwrap(); + conn.execute("CREATE TABLE test (x INTEGER)", []).unwrap(); + } + + for i in 0..num_connections { + let conn = rusqlite::Connection::open(path.to_str().unwrap()).unwrap(); + for j in 0..num_batch_inserts { + conn.execute(&inserts[i as usize][j as usize], []).unwrap(); + } + } + }); + }); +} + criterion_group! { name = benches; config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = bench_open, bench_alter, bench_prepare_query, bench_execute_select_1, bench_execute_select_rows, bench_execute_select_count, bench_insert_rows + targets = bench_open, bench_alter, bench_prepare_query, bench_execute_select_1, bench_execute_select_rows, bench_execute_select_count, bench_insert_rows, bench_concurrent_writes } criterion_main!(benches);