From eeb14b25c6f7705a6fbfce296b961c7f95f1eb18 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 1 Oct 2025 08:48:02 +0300 Subject: [PATCH 1/4] perf/throughput: Replace think time with CPU-bound compute time Replace the sleep-based --think parameter with a --compute parameter that uses a busy loop to simulate realistic CPU or GPU bound business logic (e.g., parsing, data aggregation, or ML inference). The compute time is now specified in microseconds instead of milliseconds for finer granularity. --- perf/throughput/rusqlite/src/main.rs | 32 +++++++++++++++++++++------- perf/throughput/turso/src/main.rs | 32 +++++++++++++++++++--------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/perf/throughput/rusqlite/src/main.rs b/perf/throughput/rusqlite/src/main.rs index 1dd518270..af54b8818 100644 --- a/perf/throughput/rusqlite/src/main.rs +++ b/perf/throughput/rusqlite/src/main.rs @@ -18,11 +18,11 @@ struct Args { iterations: usize, #[arg( - long = "think", + long = "compute", default_value = "0", - help = "Per transaction think time (ms)" + help = "Per transaction compute time (us)" )] - think: u64, + compute: u64, } fn main() -> Result<()> { @@ -61,7 +61,7 @@ fn main() -> Result<()> { args.batch_size, args.iterations, barrier, - args.think, + args.compute, ) }); @@ -126,7 +126,7 @@ fn worker_thread( batch_size: usize, iterations: usize, start_barrier: Arc, - think_ms: u64, + compute_usec: u64, ) -> Result { let conn = Connection::open(&db_path)?; @@ -142,14 +142,16 @@ fn worker_thread( conn.execute("BEGIN", [])?; + let result = perform_compute(thread_id, compute_usec); + + std::hint::black_box(result); + for i in 0..batch_size { let id = thread_id * iterations * batch_size + iteration * batch_size + i; stmt.execute([&id.to_string(), &format!("data_{id}")])?; total_inserts += 1; } - if think_ms > 0 { - thread::sleep(std::time::Duration::from_millis(think_ms)); - } + conn.execute("COMMIT", [])?; } @@ -166,3 +168,17 @@ fn worker_thread( Ok(total_inserts) } + +// Busy loop to simulate CPU or GPU bound computation (for example, parsing, +// data aggregation or ML inference). +fn perform_compute(thread_id: usize, usec: u64) -> u64 { + if usec == 0 { + return 0; + } + let start = Instant::now(); + let mut sum: u64 = 0; + while start.elapsed().as_micros() < usec as u128 { + sum = sum.wrapping_add(thread_id as u64); + } + sum +} diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 409f68f80..745b392d1 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -35,11 +35,11 @@ struct Args { mode: TransactionMode, #[arg( - long = "think", + long = "compute", default_value = "0", - help = "Per transaction think time (ms)" + help = "Per transaction compute time (us)" )] - think: u64, + compute: u64, #[arg( long = "timeout", @@ -95,7 +95,7 @@ async fn main() -> Result<()> { args.iterations, barrier, args.mode, - args.think, + args.compute, timeout, )); @@ -171,7 +171,6 @@ async fn setup_database( ) .await?; - println!("Database created at: {db_path}"); Ok(db) } @@ -183,7 +182,7 @@ async fn worker_thread( iterations: usize, start_barrier: Arc, mode: TransactionMode, - think_ms: u64, + compute_usec: u64, timeout: Duration, ) -> Result { start_barrier.wait(); @@ -208,6 +207,9 @@ async fn worker_thread( }; conn.execute(begin_stmt, ()).await?; + let result = perform_compute(thread_id, compute_usec); + std::hint::black_box(result); + for i in 0..batch_size { let id = thread_id * iterations * batch_size + iteration * batch_size + i; stmt.execute(turso::params::Params::Positional(vec![ @@ -218,10 +220,6 @@ async fn worker_thread( total_inserts.fetch_add(1, Ordering::Relaxed); } - if think_ms > 0 { - tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await; - } - conn.execute("COMMIT", ()).await?; Ok::<_, turso::Error>(()) }; @@ -250,3 +248,17 @@ async fn worker_thread( Ok(final_inserts) } + +// Busy loop to simulate CPU or GPU bound computation (for example, parsing, +// data aggregation or ML inference). +fn perform_compute(thread_id: usize, usec: u64) -> u64 { + if usec == 0 { + return 0; + } + let start = Instant::now(); + let mut sum: u64 = 0; + while start.elapsed().as_micros() < usec as u128 { + sum = sum.wrapping_add(thread_id as u64); + } + sum +} From 63895dfecdaa2f314c7c96cf5201d8759c301482 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 1 Oct 2025 09:49:29 +0300 Subject: [PATCH 2/4] perf/throughput: Simplify benchmark output to CSV format Remove verbose output from rusqlite benchmark and output only CSV format: system,threads,batch_size,compute,throughput This makes it easier to parse and plot benchmark results. --- perf/throughput/rusqlite/scripts/bench.sh | 11 ++++++++ perf/throughput/rusqlite/src/main.rs | 30 ++-------------------- perf/throughput/turso/scripts/bench.sh | 11 ++++++++ perf/throughput/turso/src/main.rs | 31 ++--------------------- 4 files changed, 26 insertions(+), 57 deletions(-) create mode 100755 perf/throughput/rusqlite/scripts/bench.sh create mode 100755 perf/throughput/turso/scripts/bench.sh diff --git a/perf/throughput/rusqlite/scripts/bench.sh b/perf/throughput/rusqlite/scripts/bench.sh new file mode 100755 index 000000000..a4efcc6e5 --- /dev/null +++ b/perf/throughput/rusqlite/scripts/bench.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +cargo build --release + +echo "system,threads,batch_size,compute,throughput" + +for threads in 1 2 4 8; do + for compute in 0 100 500 1000; do + ../../../target/release/write-throughput-sqlite --threads ${threads} --batch-size 100 --compute ${compute} -i 1000 + done +done diff --git a/perf/throughput/rusqlite/src/main.rs b/perf/throughput/rusqlite/src/main.rs index af54b8818..c926f2a44 100644 --- a/perf/throughput/rusqlite/src/main.rs +++ b/perf/throughput/rusqlite/src/main.rs @@ -28,11 +28,6 @@ struct Args { fn main() -> Result<()> { let args = Args::parse(); - println!( - "Running write throughput benchmark with {} threads, {} batch size, {} iterations", - args.threads, args.batch_size, args.iterations - ); - let db_path = "write_throughput_test.db"; if std::path::Path::new(db_path).exists() { std::fs::remove_file(db_path).expect("Failed to remove existing database"); @@ -86,17 +81,9 @@ fn main() -> Result<()> { let overall_elapsed = overall_start.elapsed(); let overall_throughput = (total_inserts as f64) / overall_elapsed.as_secs_f64(); - println!("\n=== BENCHMARK RESULTS ==="); - println!("Total inserts: {total_inserts}",); - println!("Total time: {:.2}s", overall_elapsed.as_secs_f64()); - println!("Overall throughput: {overall_throughput:.2} inserts/sec"); - println!("Threads: {}", args.threads); - println!("Batch size: {}", args.batch_size); - println!("Iterations per thread: {}", args.iterations); - println!( - "Database file exists: {}", - std::path::Path::new(db_path).exists() + "SQLite,{},{},{},{:.2}", + args.threads, args.batch_size, args.compute, overall_throughput ); Ok(()) @@ -116,7 +103,6 @@ fn setup_database(db_path: &str) -> Result { [], )?; - println!("Database created at: {db_path}"); Ok(conn) } @@ -134,7 +120,6 @@ fn worker_thread( start_barrier.wait(); - let start_time = Instant::now(); let mut total_inserts = 0; for iteration in 0..iterations { @@ -155,17 +140,6 @@ fn worker_thread( conn.execute("COMMIT", [])?; } - let elapsed = start_time.elapsed(); - let throughput = (total_inserts as f64) / elapsed.as_secs_f64(); - - println!( - "Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)", - thread_id, - total_inserts, - elapsed.as_secs_f64(), - throughput - ); - Ok(total_inserts) } diff --git a/perf/throughput/turso/scripts/bench.sh b/perf/throughput/turso/scripts/bench.sh new file mode 100755 index 000000000..6d6f75d4b --- /dev/null +++ b/perf/throughput/turso/scripts/bench.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +cargo build --release + +echo "system,threads,batch_size,compute,throughput" + +for threads in 1 2 4 8; do + for compute in 0 100 500 1000; do + ../../../target/release/write-throughput --threads ${threads} --batch-size 100 --compute ${compute} -i 1000 --mode concurrent + done +done diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 745b392d1..8e9f13c77 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -61,11 +61,6 @@ async fn main() -> Result<()> { .init(); let args = Args::parse(); - println!( - "Running write throughput benchmark with {} threads, {} batch size, {} iterations, mode: {:?}", - args.threads, args.batch_size, args.iterations, args.mode - ); - let db_path = "write_throughput_test.db"; if std::path::Path::new(db_path).exists() { std::fs::remove_file(db_path).expect("Failed to remove existing database"); @@ -120,21 +115,10 @@ async fn main() -> Result<()> { let overall_elapsed = overall_start.elapsed(); let overall_throughput = (total_inserts as f64) / overall_elapsed.as_secs_f64(); - println!("\n=== BENCHMARK RESULTS ==="); - println!("Total inserts: {total_inserts}"); - println!("Total time: {:.2}s", overall_elapsed.as_secs_f64()); - println!("Overall throughput: {overall_throughput:.2} inserts/sec"); - println!("Threads: {}", args.threads); - println!("Batch size: {}", args.batch_size); - println!("Iterations per thread: {}", args.iterations); - println!( - "Database file exists: {}", - std::path::Path::new(db_path).exists() + "Turso,{},{},{},{:.2}", + args.threads, args.batch_size, args.compute, overall_throughput ); - if let Ok(metadata) = std::fs::metadata(db_path) { - println!("Database file size: {} bytes", metadata.len()); - } Ok(()) } @@ -187,7 +171,6 @@ async fn worker_thread( ) -> Result { start_barrier.wait(); - let start_time = Instant::now(); let total_inserts = Arc::new(AtomicU64::new(0)); let mut tx_futs = vec![]; @@ -234,17 +217,7 @@ async fn worker_thread( result?; } - let elapsed = start_time.elapsed(); let final_inserts = total_inserts.load(Ordering::Relaxed); - let throughput = (final_inserts as f64) / elapsed.as_secs_f64(); - - println!( - "Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)", - thread_id, - final_inserts, - elapsed.as_secs_f64(), - throughput - ); Ok(final_inserts) } From 3fcb0581ec6938cf72351fd1279c3389373d4e1f Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 1 Oct 2025 10:18:43 +0300 Subject: [PATCH 3/4] perf/throughput: Fix thread pool size in Turso benchmark Replace #[tokio::main] with explicit Runtime builder to set the number of tokio worker threads to match the benchmark thread count. This ensures proper thread control and avoids interference from default tokio thread pool sizing. --- perf/throughput/turso/src/main.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 8e9f13c77..dbe2318f1 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -52,8 +52,7 @@ struct Args { io: Option, } -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .with_ansi(false) @@ -61,6 +60,15 @@ async fn main() -> Result<()> { .init(); let args = Args::parse(); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(args.threads) + .build() + .unwrap(); + + rt.block_on(async_main(args)) +} + +async fn async_main(args: Args) -> Result<()> { let db_path = "write_throughput_test.db"; if std::path::Path::new(db_path).exists() { std::fs::remove_file(db_path).expect("Failed to remove existing database"); From 51f4f1fb8b64351d2fb238ec4689b5e540a3fefe Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 1 Oct 2025 10:11:24 +0300 Subject: [PATCH 4/4] perf/throughput: Add plotting scripts This adds few helper scripts to plot throughput results. --- perf/throughput/plot/plot-compute-write.py | 84 ++++++++++++++++++++++ perf/throughput/plot/plot-write.py | 84 ++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 perf/throughput/plot/plot-compute-write.py create mode 100644 perf/throughput/plot/plot-write.py diff --git a/perf/throughput/plot/plot-compute-write.py b/perf/throughput/plot/plot-compute-write.py new file mode 100644 index 000000000..1807641ad --- /dev/null +++ b/perf/throughput/plot/plot-compute-write.py @@ -0,0 +1,84 @@ +import os +import sys + +import matplotlib.pyplot as plt +import pandas as pd +import scienceplots # noqa: F401 + +plt.style.use(["science"]) +plt.rcParams.update({ + "text.usetex": True, + "font.family": "serif", + "font.serif": ["Times"], +}) + +# Get CSV filename from command line argument +if len(sys.argv) < 2: + print("Usage: python script.py ") + sys.exit(1) + +csv_filename = sys.argv[1] + +# Get basename without extension for output filename +basename = os.path.splitext(csv_filename)[0] +output_filename = f"{basename}-compute.png" + +# Read data from CSV file +df = pd.read_csv(csv_filename) + +# Create figure and axis +fig, ax = plt.subplots(figsize=(10, 6)) + +# Get unique systems and thread counts +systems = df["system"].unique() +thread_counts = sorted(df["threads"].unique()) + +# Get colors from the current color cycle +prop_cycle = plt.rcParams["axes.prop_cycle"] +colors_list = prop_cycle.by_key()["color"] + +# Plot a line for each system-thread combination +markers = ["o", "s", "^", "D"] +linestyles = ["-", "--", "-.", ":"] + +plot_idx = 0 +for sys_idx, system in enumerate(systems): + df_system = df[df["system"] == system] + for thread_idx, threads in enumerate(thread_counts): + df_thread = df_system[df_system["threads"] == threads].sort_values("compute") + if len(df_thread) > 0: + ax.plot(df_thread["compute"], df_thread["throughput"], + marker=markers[thread_idx % len(markers)], + color=colors_list[plot_idx % len(colors_list)], + linestyle=linestyles[sys_idx % len(linestyles)], + linewidth=2, markersize=8, + label=f'{system} ({threads} thread{"s" if threads > 1 else ""})') + plot_idx += 1 + +# Customize the plot +ax.set_xlabel(r"Compute Time (microseconds)", fontsize=14, fontweight="bold") +ax.set_ylabel("Throughput (rows/second)", fontsize=14, fontweight="bold") + +# Set y-axis to start from 0 with dynamic upper limit +max_throughput = df["throughput"].max() +ax.set_ylim(0, max_throughput * 1.15) # Add 15% tolerance for legend space + +# Format y-axis labels +ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{int(x/1000)}k")) + +# Add legend +ax.legend(loc="lower left", frameon=True, fontsize=11) + +# Add grid for better readability +ax.grid(axis="both", alpha=0.3, linestyle="--") +ax.set_axisbelow(True) + +# Adjust layout +plt.tight_layout() + +# Save the figure +plt.savefig(output_filename, dpi=300, bbox_inches="tight") +print(f"Saved plot to {output_filename}") + +# Display the plot +plt.show() diff --git a/perf/throughput/plot/plot-write.py b/perf/throughput/plot/plot-write.py new file mode 100644 index 000000000..f3fc6ae86 --- /dev/null +++ b/perf/throughput/plot/plot-write.py @@ -0,0 +1,84 @@ +import os +import sys + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import scienceplots # noqa: F401 + +plt.style.use(["science"]) + +# Get CSV filename from command line argument +if len(sys.argv) < 2: + print("Usage: python script.py ") + sys.exit(1) + +csv_filename = sys.argv[1] + +# Get basename without extension for output filename +basename = os.path.splitext(csv_filename)[0] +output_filename = f"{basename}-write.png" + +# Read data from CSV file +df = pd.read_csv(csv_filename) + +# Filter for compute time = 0 +df_filtered = df[df["compute"] == 0].sort_values("threads") + +# Get unique systems and threads +systems = df_filtered["system"].unique() +threads = sorted(df_filtered["threads"].unique()) + +# Create figure and axis +fig, ax = plt.subplots(figsize=(10, 6)) + +# Set up bar positions +x_pos = np.arange(len(threads)) +bar_width = 0.35 + +# Get colors from the current color cycle +prop_cycle = plt.rcParams["axes.prop_cycle"] +colors_list = prop_cycle.by_key()["color"] + +# Plot bars for each system +for i, system in enumerate(systems): + system_data = df_filtered[df_filtered["system"] == system].sort_values("threads") + throughput = system_data["throughput"].tolist() + + offset = (i - len(systems)/2 + 0.5) * bar_width + bars = ax.bar(x_pos + offset, throughput, bar_width, + label=system, + color=colors_list[i % len(colors_list)], + edgecolor="black", linewidth=1.2) + +# Customize the plot +ax.set_xlabel("Number of Threads", fontsize=14, fontweight="bold") +ax.set_ylabel("Throughput (rows/sec)", fontsize=14, fontweight="bold") + +# Set y-axis to start from 0 with dynamic upper limit +max_throughput = df_filtered["throughput"].max() +ax.set_ylim(0, max_throughput * 1.15) # Add 15% tolerance for legend space + +# Format y-axis labels +ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{int(x/1000)}k")) + +# Set x-axis ticks to show thread values +ax.set_xticks(x_pos) +ax.set_xticklabels(threads) + +# Add legend +ax.legend(loc="upper left", frameon=True, fontsize=12) + +# Add grid for better readability +ax.grid(axis="y", alpha=0.3, linestyle="--") +ax.set_axisbelow(True) + +# Adjust layout +plt.tight_layout() + +# Save the figure +plt.savefig(output_filename, dpi=300, bbox_inches="tight") +print(f"Saved plot to {output_filename}") + +# Display the plot +plt.show()