Merge 'benchmark: introduce simple 1 thread concurrent benchmark for mvcc/sq…' from Pere Diaz Bou

…lite/wal
This is considerably simpler with 1 thread as we just try to yield
control when I/O happens and we only run io.run_once when all
connections tried to do some work. This allows connections to
cooperatively progress.

Closes #3060
This commit is contained in:
Pekka Enberg
2025-09-12 17:27:41 +03:00
committed by GitHub

View File

@@ -2,7 +2,7 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criteri
use pprof::criterion::{Output, PProfProfiler};
use regex::Regex;
use std::{sync::Arc, time::Instant};
use turso_core::{Database, PlatformIO};
use turso_core::{Database, LimboError, PlatformIO, StepResult};
#[cfg(not(target_family = "wasm"))]
#[global_allocator]
@@ -631,9 +631,274 @@ fn bench_insert_rows(criterion: &mut Criterion) {
group.finish();
}
#[inline(never)]
fn bench_limbo(
mvcc: bool,
num_connections: i64,
num_batch_inserts: i64,
num_inserts_per_batch: usize,
) {
struct ConnectionState {
conn: Arc<turso_core::Connection>,
inserts: Vec<String>,
current_statement: Option<turso_core::Statement>,
}
#[allow(clippy::arc_with_non_send_sync)]
let io = Arc::new(PlatformIO::new().unwrap());
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("bench.db");
let db = Database::open_file(io.clone(), path.to_str().unwrap(), mvcc, false).unwrap();
let mut connecitons = Vec::new();
{
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE test (x)").unwrap();
conn.close().unwrap();
}
let inserts =
generate_inserts_per_connection(num_connections, num_batch_inserts, num_inserts_per_batch);
for i in 0..num_connections {
let conn = db.connect().unwrap();
let inserts = inserts[i as usize].clone();
connecitons.push(ConnectionState {
conn,
inserts,
current_statement: None,
});
}
loop {
let mut all_finished = true;
for conn in &mut connecitons {
if !conn.inserts.is_empty() || conn.current_statement.is_some() {
all_finished = false;
break;
}
}
for conn in connecitons.iter_mut() {
if conn.current_statement.is_none() && !conn.inserts.is_empty() {
let write = conn.inserts.pop().unwrap();
conn.current_statement = Some(conn.conn.prepare(&write).unwrap());
}
if conn.current_statement.is_none() {
continue;
}
let stmt = conn.current_statement.as_mut().unwrap();
match stmt.step().unwrap() {
// These you be only possible cases in write concurrency.
// No rows because insert doesn't return
// No interrupt because insert doesn't interrupt
// No busy because insert in mvcc should be multi concurrent write
StepResult::Done => {
conn.current_statement = None;
}
StepResult::IO => {
// let's skip doing I/O here, we want to perform io only after all the statements are stepped
}
StepResult::Busy => {
// We need to restart statement
if mvcc {
unreachable!();
}
stmt.reset();
}
_ => {
unreachable!()
}
}
}
db.io.step().unwrap();
if all_finished {
break;
}
}
}
#[inline(never)]
fn bench_limbo_mvcc(
mvcc: bool,
num_connections: i64,
num_batch_inserts: i64,
num_inserts_per_batch: usize,
) {
struct ConnectionState {
conn: Arc<turso_core::Connection>,
inserts: Vec<String>,
current_statement: Option<turso_core::Statement>,
current_insert: Option<String>,
}
#[allow(clippy::arc_with_non_send_sync)]
let io = Arc::new(PlatformIO::new().unwrap());
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("bench.db");
let db = Database::open_file(io.clone(), path.to_str().unwrap(), mvcc, false).unwrap();
let mut connecitons = Vec::new();
let conn0 = db.connect().unwrap();
conn0.execute("CREATE TABLE test (x)").unwrap();
let inserts =
generate_inserts_per_connection(num_connections, num_batch_inserts, num_inserts_per_batch);
for i in 0..num_connections {
let conn = db.connect().unwrap();
let inserts = inserts[i as usize].clone();
connecitons.push(ConnectionState {
conn,
inserts,
current_statement: None,
current_insert: None,
});
}
loop {
let mut all_finished = true;
for conn in &mut connecitons {
if !conn.inserts.is_empty() || conn.current_statement.is_some() {
all_finished = false;
break;
}
}
for conn in connecitons.iter_mut() {
if conn.current_statement.is_none() && !conn.inserts.is_empty() {
let write = conn.inserts.pop().unwrap();
conn.conn.execute("BEGIN CONCURRENT").unwrap();
conn.current_statement = Some(conn.conn.prepare(&write).unwrap());
conn.current_insert = Some(write);
}
if conn.current_statement.is_none() {
continue;
}
let stmt = conn.current_statement.as_mut().unwrap();
match stmt.step() {
// These you be only possible cases in write concurrency.
// No rows because insert doesn't return
// No interrupt because insert doesn't interrupt
// No busy because insert in mvcc should be multi concurrent write
Ok(StepResult::Done) => {
// Now do commit
conn.current_statement = Some(conn.conn.prepare("COMMIT").unwrap());
}
Ok(StepResult::IO) => {
stmt.step().unwrap();
// let's skip doing I/O here, we want to perform io only after all the statements are stepped
}
Ok(StepResult::Busy) => {
// We need to restart statement
if mvcc {
unreachable!();
}
println!("resetting statement");
stmt.reset();
}
Err(err) => {
if let LimboError::SchemaUpdated = err {
conn.current_statement = Some(
conn.conn
.prepare(conn.current_insert.clone().as_ref().unwrap())
.unwrap(),
);
continue;
}
panic!("unexpected error: {err:?}");
}
_ => {
unreachable!()
}
}
}
db.io.step().unwrap();
if all_finished {
break;
}
}
}
fn generate_inserts_per_connection(
num_connections: i64,
num_batch_inserts: i64,
num_inserts_per_batch: usize,
) -> Vec<Vec<String>> {
let mut inserts = vec![];
for i in 0..num_connections {
let mut inserts_per_connection = vec![];
for j in 0..num_batch_inserts {
inserts_per_connection.push(generate_batch_insert(
num_batch_inserts * (i + j),
num_inserts_per_batch,
));
}
inserts.push(inserts_per_connection);
}
inserts
}
fn generate_batch_insert(start: i64, num: usize) -> String {
let mut inserts = String::from("INSERT INTO test (x) VALUES ");
for i in 0..num {
inserts.push_str(&format!("({})", start + i as i64));
if i < num - 1 {
inserts.push(',');
}
}
inserts
}
fn bench_concurrent_writes(criterion: &mut Criterion) {
let mut group = criterion.benchmark_group("Concurrent writes");
let num_connections = 10;
let num_batch_inserts = 100;
let num_inserts_per_batch = 200_usize;
group.bench_function("limbo_wal_concurrent_writes", |b| {
b.iter(|| {
bench_limbo(
false,
num_connections,
num_batch_inserts,
num_inserts_per_batch,
);
});
});
group.bench_function("limbo_mvcc_concurrent_writes", |b| {
b.iter(|| {
bench_limbo_mvcc(
true,
num_connections,
num_batch_inserts,
num_inserts_per_batch,
);
});
});
group.bench_function("sqlite_concurrent_writes", |b| {
let inserts = generate_inserts_per_connection(
num_connections,
num_batch_inserts,
num_inserts_per_batch,
);
b.iter(|| {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("bench.db");
{
let conn = rusqlite::Connection::open(path.to_str().unwrap()).unwrap();
conn.pragma_update(None, "synchronous", "FULL").unwrap();
conn.pragma_update(None, "journal_mode", "WAL").unwrap();
conn.pragma_update(None, "locking_mode", "EXCLUSIVE")
.unwrap();
conn.execute("CREATE TABLE test (x INTEGER)", []).unwrap();
}
for i in 0..num_connections {
let conn = rusqlite::Connection::open(path.to_str().unwrap()).unwrap();
for j in 0..num_batch_inserts {
conn.execute(&inserts[i as usize][j as usize], []).unwrap();
}
}
});
});
}
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_open, bench_alter, bench_prepare_query, bench_execute_select_1, bench_execute_select_rows, bench_execute_select_count, bench_insert_rows
targets = bench_open, bench_alter, bench_prepare_query, bench_execute_select_1, bench_execute_select_rows, bench_execute_select_count, bench_insert_rows, bench_concurrent_writes
}
criterion_main!(benches);