Merge 'perf/throughput/turso: Async transactions with concurrent mode' from Pekka Enberg

With `BEGIN CONCURRENT`, we should also take advantage of async
transaction processing to maximize concurrency.

Closes #3082
This commit is contained in:
Pekka Enberg
2025-09-13 15:07:29 +03:00
committed by GitHub
2 changed files with 46 additions and 29 deletions

View File

@@ -132,14 +132,14 @@ fn worker_thread(
conn.busy_timeout(std::time::Duration::from_secs(30))?;
let mut stmt = conn.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")?;
start_barrier.wait();
let start_time = Instant::now();
let mut total_inserts = 0;
for iteration in 0..iterations {
let mut stmt = conn.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")?;
conn.execute("BEGIN", [])?;
for i in 0..batch_size {

View File

@@ -1,5 +1,6 @@
use clap::{Parser, ValueEnum};
use std::sync::{Arc, Barrier};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use tokio::runtime::Runtime;
use turso::{Builder, Database, Result};
@@ -151,49 +152,65 @@ async fn worker_thread(
) -> Result<u64> {
let conn = db.connect()?;
let mut stmt = conn
.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")
.await?;
start_barrier.wait();
let start_time = Instant::now();
let mut total_inserts = 0;
let total_inserts = Arc::new(AtomicU64::new(0));
let mut tx_futs = vec![];
for iteration in 0..iterations {
let begin_stmt = match mode {
TransactionMode::Legacy | TransactionMode::Mvcc => "BEGIN",
TransactionMode::Concurrent => "BEGIN CONCURRENT",
let conn = conn.clone();
let total_inserts = Arc::clone(&total_inserts);
let tx_fut = async move {
let mut stmt = conn
.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")
.await?;
let begin_stmt = match mode {
TransactionMode::Legacy | TransactionMode::Mvcc => "BEGIN",
TransactionMode::Concurrent => "BEGIN CONCURRENT",
};
conn.execute(begin_stmt, ()).await?;
for i in 0..batch_size {
let id = thread_id * iterations * batch_size + iteration * batch_size + i;
stmt.execute(turso::params::Params::Positional(vec![
turso::Value::Integer(id as i64),
turso::Value::Text(format!("data_{}", id)),
]))
.await?;
total_inserts.fetch_add(1, Ordering::Relaxed);
}
if think_ms > 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await;
}
conn.execute("COMMIT", ()).await?;
Ok::<_, turso::Error>(())
};
conn.execute(begin_stmt, ()).await?;
match mode {
TransactionMode::Concurrent => tx_futs.push(tx_fut),
_ => tx_fut.await?,
};
}
for i in 0..batch_size {
let id = thread_id * iterations * batch_size + iteration * batch_size + i;
stmt.execute(turso::params::Params::Positional(vec![
turso::Value::Integer(id as i64),
turso::Value::Text(format!("data_{}", id)),
]))
.await?;
total_inserts += 1;
}
if think_ms > 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await;
}
conn.execute("COMMIT", ()).await?;
for tx_fut in tx_futs {
tx_fut.await?;
}
let elapsed = start_time.elapsed();
let throughput = (total_inserts as f64) / elapsed.as_secs_f64();
let final_inserts = total_inserts.load(Ordering::Relaxed);
let throughput = (final_inserts as f64) / elapsed.as_secs_f64();
println!(
"Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)",
thread_id,
total_inserts,
final_inserts,
elapsed.as_secs_f64(),
throughput
);
Ok(total_inserts)
Ok(final_inserts)
}