diff --git a/perf/throughput/plot/plot-compute-write.py b/perf/throughput/plot/plot-compute-write.py new file mode 100644 index 000000000..1807641ad --- /dev/null +++ b/perf/throughput/plot/plot-compute-write.py @@ -0,0 +1,84 @@ +import os +import sys + +import matplotlib.pyplot as plt +import pandas as pd +import scienceplots # noqa: F401 + +plt.style.use(["science"]) +plt.rcParams.update({ + "text.usetex": True, + "font.family": "serif", + "font.serif": ["Times"], +}) + +# Get CSV filename from command line argument +if len(sys.argv) < 2: + print("Usage: python script.py ") + sys.exit(1) + +csv_filename = sys.argv[1] + +# Get basename without extension for output filename +basename = os.path.splitext(csv_filename)[0] +output_filename = f"{basename}-compute.png" + +# Read data from CSV file +df = pd.read_csv(csv_filename) + +# Create figure and axis +fig, ax = plt.subplots(figsize=(10, 6)) + +# Get unique systems and thread counts +systems = df["system"].unique() +thread_counts = sorted(df["threads"].unique()) + +# Get colors from the current color cycle +prop_cycle = plt.rcParams["axes.prop_cycle"] +colors_list = prop_cycle.by_key()["color"] + +# Plot a line for each system-thread combination +markers = ["o", "s", "^", "D"] +linestyles = ["-", "--", "-.", ":"] + +plot_idx = 0 +for sys_idx, system in enumerate(systems): + df_system = df[df["system"] == system] + for thread_idx, threads in enumerate(thread_counts): + df_thread = df_system[df_system["threads"] == threads].sort_values("compute") + if len(df_thread) > 0: + ax.plot(df_thread["compute"], df_thread["throughput"], + marker=markers[thread_idx % len(markers)], + color=colors_list[plot_idx % len(colors_list)], + linestyle=linestyles[sys_idx % len(linestyles)], + linewidth=2, markersize=8, + label=f'{system} ({threads} thread{"s" if threads > 1 else ""})') + plot_idx += 1 + +# Customize the plot +ax.set_xlabel(r"Compute Time (microseconds)", fontsize=14, fontweight="bold") +ax.set_ylabel("Throughput (rows/second)", fontsize=14, fontweight="bold") + +# Set y-axis to start from 0 with dynamic upper limit +max_throughput = df["throughput"].max() +ax.set_ylim(0, max_throughput * 1.15) # Add 15% tolerance for legend space + +# Format y-axis labels +ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{int(x/1000)}k")) + +# Add legend +ax.legend(loc="lower left", frameon=True, fontsize=11) + +# Add grid for better readability +ax.grid(axis="both", alpha=0.3, linestyle="--") +ax.set_axisbelow(True) + +# Adjust layout +plt.tight_layout() + +# Save the figure +plt.savefig(output_filename, dpi=300, bbox_inches="tight") +print(f"Saved plot to {output_filename}") + +# Display the plot +plt.show() diff --git a/perf/throughput/plot/plot-write.py b/perf/throughput/plot/plot-write.py new file mode 100644 index 000000000..f3fc6ae86 --- /dev/null +++ b/perf/throughput/plot/plot-write.py @@ -0,0 +1,84 @@ +import os +import sys + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import scienceplots # noqa: F401 + +plt.style.use(["science"]) + +# Get CSV filename from command line argument +if len(sys.argv) < 2: + print("Usage: python script.py ") + sys.exit(1) + +csv_filename = sys.argv[1] + +# Get basename without extension for output filename +basename = os.path.splitext(csv_filename)[0] +output_filename = f"{basename}-write.png" + +# Read data from CSV file +df = pd.read_csv(csv_filename) + +# Filter for compute time = 0 +df_filtered = df[df["compute"] == 0].sort_values("threads") + +# Get unique systems and threads +systems = df_filtered["system"].unique() +threads = sorted(df_filtered["threads"].unique()) + +# Create figure and axis +fig, ax = plt.subplots(figsize=(10, 6)) + +# Set up bar positions +x_pos = np.arange(len(threads)) +bar_width = 0.35 + +# Get colors from the current color cycle +prop_cycle = plt.rcParams["axes.prop_cycle"] +colors_list = prop_cycle.by_key()["color"] + +# Plot bars for each system +for i, system in enumerate(systems): + system_data = df_filtered[df_filtered["system"] == system].sort_values("threads") + throughput = system_data["throughput"].tolist() + + offset = (i - len(systems)/2 + 0.5) * bar_width + bars = ax.bar(x_pos + offset, throughput, bar_width, + label=system, + color=colors_list[i % len(colors_list)], + edgecolor="black", linewidth=1.2) + +# Customize the plot +ax.set_xlabel("Number of Threads", fontsize=14, fontweight="bold") +ax.set_ylabel("Throughput (rows/sec)", fontsize=14, fontweight="bold") + +# Set y-axis to start from 0 with dynamic upper limit +max_throughput = df_filtered["throughput"].max() +ax.set_ylim(0, max_throughput * 1.15) # Add 15% tolerance for legend space + +# Format y-axis labels +ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{int(x/1000)}k")) + +# Set x-axis ticks to show thread values +ax.set_xticks(x_pos) +ax.set_xticklabels(threads) + +# Add legend +ax.legend(loc="upper left", frameon=True, fontsize=12) + +# Add grid for better readability +ax.grid(axis="y", alpha=0.3, linestyle="--") +ax.set_axisbelow(True) + +# Adjust layout +plt.tight_layout() + +# Save the figure +plt.savefig(output_filename, dpi=300, bbox_inches="tight") +print(f"Saved plot to {output_filename}") + +# Display the plot +plt.show() diff --git a/perf/throughput/rusqlite/scripts/bench.sh b/perf/throughput/rusqlite/scripts/bench.sh new file mode 100755 index 000000000..a4efcc6e5 --- /dev/null +++ b/perf/throughput/rusqlite/scripts/bench.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +cargo build --release + +echo "system,threads,batch_size,compute,throughput" + +for threads in 1 2 4 8; do + for compute in 0 100 500 1000; do + ../../../target/release/write-throughput-sqlite --threads ${threads} --batch-size 100 --compute ${compute} -i 1000 + done +done diff --git a/perf/throughput/rusqlite/src/main.rs b/perf/throughput/rusqlite/src/main.rs index 1dd518270..c926f2a44 100644 --- a/perf/throughput/rusqlite/src/main.rs +++ b/perf/throughput/rusqlite/src/main.rs @@ -18,21 +18,16 @@ struct Args { iterations: usize, #[arg( - long = "think", + long = "compute", default_value = "0", - help = "Per transaction think time (ms)" + help = "Per transaction compute time (us)" )] - think: u64, + compute: u64, } fn main() -> Result<()> { let args = Args::parse(); - println!( - "Running write throughput benchmark with {} threads, {} batch size, {} iterations", - args.threads, args.batch_size, args.iterations - ); - let db_path = "write_throughput_test.db"; if std::path::Path::new(db_path).exists() { std::fs::remove_file(db_path).expect("Failed to remove existing database"); @@ -61,7 +56,7 @@ fn main() -> Result<()> { args.batch_size, args.iterations, barrier, - args.think, + args.compute, ) }); @@ -86,17 +81,9 @@ fn main() -> Result<()> { let overall_elapsed = overall_start.elapsed(); let overall_throughput = (total_inserts as f64) / overall_elapsed.as_secs_f64(); - println!("\n=== BENCHMARK RESULTS ==="); - println!("Total inserts: {total_inserts}",); - println!("Total time: {:.2}s", overall_elapsed.as_secs_f64()); - println!("Overall throughput: {overall_throughput:.2} inserts/sec"); - println!("Threads: {}", args.threads); - println!("Batch size: {}", args.batch_size); - println!("Iterations per thread: {}", args.iterations); - println!( - "Database file exists: {}", - std::path::Path::new(db_path).exists() + "SQLite,{},{},{},{:.2}", + args.threads, args.batch_size, args.compute, overall_throughput ); Ok(()) @@ -116,7 +103,6 @@ fn setup_database(db_path: &str) -> Result { [], )?; - println!("Database created at: {db_path}"); Ok(conn) } @@ -126,7 +112,7 @@ fn worker_thread( batch_size: usize, iterations: usize, start_barrier: Arc, - think_ms: u64, + compute_usec: u64, ) -> Result { let conn = Connection::open(&db_path)?; @@ -134,7 +120,6 @@ fn worker_thread( start_barrier.wait(); - let start_time = Instant::now(); let mut total_inserts = 0; for iteration in 0..iterations { @@ -142,27 +127,32 @@ fn worker_thread( conn.execute("BEGIN", [])?; + let result = perform_compute(thread_id, compute_usec); + + std::hint::black_box(result); + for i in 0..batch_size { let id = thread_id * iterations * batch_size + iteration * batch_size + i; stmt.execute([&id.to_string(), &format!("data_{id}")])?; total_inserts += 1; } - if think_ms > 0 { - thread::sleep(std::time::Duration::from_millis(think_ms)); - } + conn.execute("COMMIT", [])?; } - let elapsed = start_time.elapsed(); - let throughput = (total_inserts as f64) / elapsed.as_secs_f64(); - - println!( - "Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)", - thread_id, - total_inserts, - elapsed.as_secs_f64(), - throughput - ); - Ok(total_inserts) } + +// Busy loop to simulate CPU or GPU bound computation (for example, parsing, +// data aggregation or ML inference). +fn perform_compute(thread_id: usize, usec: u64) -> u64 { + if usec == 0 { + return 0; + } + let start = Instant::now(); + let mut sum: u64 = 0; + while start.elapsed().as_micros() < usec as u128 { + sum = sum.wrapping_add(thread_id as u64); + } + sum +} diff --git a/perf/throughput/turso/scripts/bench.sh b/perf/throughput/turso/scripts/bench.sh new file mode 100755 index 000000000..6d6f75d4b --- /dev/null +++ b/perf/throughput/turso/scripts/bench.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +cargo build --release + +echo "system,threads,batch_size,compute,throughput" + +for threads in 1 2 4 8; do + for compute in 0 100 500 1000; do + ../../../target/release/write-throughput --threads ${threads} --batch-size 100 --compute ${compute} -i 1000 --mode concurrent + done +done diff --git a/perf/throughput/turso/src/main.rs b/perf/throughput/turso/src/main.rs index 409f68f80..dbe2318f1 100644 --- a/perf/throughput/turso/src/main.rs +++ b/perf/throughput/turso/src/main.rs @@ -35,11 +35,11 @@ struct Args { mode: TransactionMode, #[arg( - long = "think", + long = "compute", default_value = "0", - help = "Per transaction think time (ms)" + help = "Per transaction compute time (us)" )] - think: u64, + compute: u64, #[arg( long = "timeout", @@ -52,8 +52,7 @@ struct Args { io: Option, } -#[tokio::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .with_ansi(false) @@ -61,11 +60,15 @@ async fn main() -> Result<()> { .init(); let args = Args::parse(); - println!( - "Running write throughput benchmark with {} threads, {} batch size, {} iterations, mode: {:?}", - args.threads, args.batch_size, args.iterations, args.mode - ); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(args.threads) + .build() + .unwrap(); + rt.block_on(async_main(args)) +} + +async fn async_main(args: Args) -> Result<()> { let db_path = "write_throughput_test.db"; if std::path::Path::new(db_path).exists() { std::fs::remove_file(db_path).expect("Failed to remove existing database"); @@ -95,7 +98,7 @@ async fn main() -> Result<()> { args.iterations, barrier, args.mode, - args.think, + args.compute, timeout, )); @@ -120,21 +123,10 @@ async fn main() -> Result<()> { let overall_elapsed = overall_start.elapsed(); let overall_throughput = (total_inserts as f64) / overall_elapsed.as_secs_f64(); - println!("\n=== BENCHMARK RESULTS ==="); - println!("Total inserts: {total_inserts}"); - println!("Total time: {:.2}s", overall_elapsed.as_secs_f64()); - println!("Overall throughput: {overall_throughput:.2} inserts/sec"); - println!("Threads: {}", args.threads); - println!("Batch size: {}", args.batch_size); - println!("Iterations per thread: {}", args.iterations); - println!( - "Database file exists: {}", - std::path::Path::new(db_path).exists() + "Turso,{},{},{},{:.2}", + args.threads, args.batch_size, args.compute, overall_throughput ); - if let Ok(metadata) = std::fs::metadata(db_path) { - println!("Database file size: {} bytes", metadata.len()); - } Ok(()) } @@ -171,7 +163,6 @@ async fn setup_database( ) .await?; - println!("Database created at: {db_path}"); Ok(db) } @@ -183,12 +174,11 @@ async fn worker_thread( iterations: usize, start_barrier: Arc, mode: TransactionMode, - think_ms: u64, + compute_usec: u64, timeout: Duration, ) -> Result { start_barrier.wait(); - let start_time = Instant::now(); let total_inserts = Arc::new(AtomicU64::new(0)); let mut tx_futs = vec![]; @@ -208,6 +198,9 @@ async fn worker_thread( }; conn.execute(begin_stmt, ()).await?; + let result = perform_compute(thread_id, compute_usec); + std::hint::black_box(result); + for i in 0..batch_size { let id = thread_id * iterations * batch_size + iteration * batch_size + i; stmt.execute(turso::params::Params::Positional(vec![ @@ -218,10 +211,6 @@ async fn worker_thread( total_inserts.fetch_add(1, Ordering::Relaxed); } - if think_ms > 0 { - tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await; - } - conn.execute("COMMIT", ()).await?; Ok::<_, turso::Error>(()) }; @@ -236,17 +225,21 @@ async fn worker_thread( result?; } - let elapsed = start_time.elapsed(); let final_inserts = total_inserts.load(Ordering::Relaxed); - let throughput = (final_inserts as f64) / elapsed.as_secs_f64(); - - println!( - "Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)", - thread_id, - final_inserts, - elapsed.as_secs_f64(), - throughput - ); Ok(final_inserts) } + +// Busy loop to simulate CPU or GPU bound computation (for example, parsing, +// data aggregation or ML inference). +fn perform_compute(thread_id: usize, usec: u64) -> u64 { + if usec == 0 { + return 0; + } + let start = Instant::now(); + let mut sum: u64 = 0; + while start.elapsed().as_micros() < usec as u128 { + sum = sum.wrapping_add(thread_id as u64); + } + sum +}