diff --git a/perf/throughput/rusqlite/src/main.rs b/perf/throughput/rusqlite/src/main.rs index 103958c71..d13bf958a 100644 --- a/perf/throughput/rusqlite/src/main.rs +++ b/perf/throughput/rusqlite/src/main.rs @@ -132,14 +132,14 @@ fn worker_thread( conn.busy_timeout(std::time::Duration::from_secs(30))?; - let mut stmt = conn.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")?; - start_barrier.wait(); let start_time = Instant::now(); let mut total_inserts = 0; for iteration in 0..iterations { + let mut stmt = conn.prepare("INSERT INTO test_table (id, data) VALUES (?, ?)")?; + conn.execute("BEGIN", [])?; for i in 0..batch_size { diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index e08b1610c..8995fa710 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -1,5 +1,6 @@ use clap::{Parser, ValueEnum}; use std::sync::{Arc, Barrier}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use tokio::runtime::Runtime; use turso::{Builder, Database, Result}; @@ -151,49 +152,65 @@ async fn worker_thread( ) -> Result { let conn = db.connect()?; - let mut stmt = conn - .prepare("INSERT INTO test_table (id, data) VALUES (?, ?)") - .await?; - start_barrier.wait(); let start_time = Instant::now(); - let mut total_inserts = 0; + let total_inserts = Arc::new(AtomicU64::new(0)); + + let mut tx_futs = vec![]; for iteration in 0..iterations { - let begin_stmt = match mode { - TransactionMode::Legacy | TransactionMode::Mvcc => "BEGIN", - TransactionMode::Concurrent => "BEGIN CONCURRENT", + let conn = conn.clone(); + let total_inserts = Arc::clone(&total_inserts); + let tx_fut = async move { + let mut stmt = conn + .prepare("INSERT INTO test_table (id, data) VALUES (?, ?)") + .await?; + + let begin_stmt = match mode { + TransactionMode::Legacy | TransactionMode::Mvcc => "BEGIN", + TransactionMode::Concurrent => "BEGIN CONCURRENT", + }; + conn.execute(begin_stmt, ()).await?; + + for i in 0..batch_size { + let id = thread_id * iterations * batch_size + iteration * batch_size + i; + stmt.execute(turso::params::Params::Positional(vec![ + turso::Value::Integer(id as i64), + turso::Value::Text(format!("data_{}", id)), + ])) + .await?; + total_inserts.fetch_add(1, Ordering::Relaxed); + } + + if think_ms > 0 { + tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await; + } + + conn.execute("COMMIT", ()).await?; + Ok::<_, turso::Error>(()) }; - conn.execute(begin_stmt, ()).await?; + match mode { + TransactionMode::Concurrent => tx_futs.push(tx_fut), + _ => tx_fut.await?, + }; + } - for i in 0..batch_size { - let id = thread_id * iterations * batch_size + iteration * batch_size + i; - stmt.execute(turso::params::Params::Positional(vec![ - turso::Value::Integer(id as i64), - turso::Value::Text(format!("data_{}", id)), - ])) - .await?; - total_inserts += 1; - } - - if think_ms > 0 { - tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await; - } - - conn.execute("COMMIT", ()).await?; + for tx_fut in tx_futs { + tx_fut.await?; } let elapsed = start_time.elapsed(); - let throughput = (total_inserts as f64) / elapsed.as_secs_f64(); + let final_inserts = total_inserts.load(Ordering::Relaxed); + let throughput = (final_inserts as f64) / elapsed.as_secs_f64(); println!( "Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)", thread_id, - total_inserts, + final_inserts, elapsed.as_secs_f64(), throughput ); - Ok(total_inserts) + Ok(final_inserts) }