Merge 'Improve throughput benchmarks' from Pekka Enberg

Closes #3493
This commit is contained in:
Pekka Enberg
2025-10-01 15:24:03 +03:00
committed by GitHub
6 changed files with 249 additions and 76 deletions

View File

@@ -0,0 +1,84 @@
import os
import sys
import matplotlib.pyplot as plt
import pandas as pd
import scienceplots # noqa: F401
plt.style.use(["science"])
plt.rcParams.update({
"text.usetex": True,
"font.family": "serif",
"font.serif": ["Times"],
})
# Get CSV filename from command line argument
if len(sys.argv) < 2:
print("Usage: python script.py <csv_filename>")
sys.exit(1)
csv_filename = sys.argv[1]
# Get basename without extension for output filename
basename = os.path.splitext(csv_filename)[0]
output_filename = f"{basename}-compute.png"
# Read data from CSV file
df = pd.read_csv(csv_filename)
# Create figure and axis
fig, ax = plt.subplots(figsize=(10, 6))
# Get unique systems and thread counts
systems = df["system"].unique()
thread_counts = sorted(df["threads"].unique())
# Get colors from the current color cycle
prop_cycle = plt.rcParams["axes.prop_cycle"]
colors_list = prop_cycle.by_key()["color"]
# Plot a line for each system-thread combination
markers = ["o", "s", "^", "D"]
linestyles = ["-", "--", "-.", ":"]
plot_idx = 0
for sys_idx, system in enumerate(systems):
df_system = df[df["system"] == system]
for thread_idx, threads in enumerate(thread_counts):
df_thread = df_system[df_system["threads"] == threads].sort_values("compute")
if len(df_thread) > 0:
ax.plot(df_thread["compute"], df_thread["throughput"],
marker=markers[thread_idx % len(markers)],
color=colors_list[plot_idx % len(colors_list)],
linestyle=linestyles[sys_idx % len(linestyles)],
linewidth=2, markersize=8,
label=f'{system} ({threads} thread{"s" if threads > 1 else ""})')
plot_idx += 1
# Customize the plot
ax.set_xlabel(r"Compute Time (microseconds)", fontsize=14, fontweight="bold")
ax.set_ylabel("Throughput (rows/second)", fontsize=14, fontweight="bold")
# Set y-axis to start from 0 with dynamic upper limit
max_throughput = df["throughput"].max()
ax.set_ylim(0, max_throughput * 1.15) # Add 15% tolerance for legend space
# Format y-axis labels
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{int(x/1000)}k"))
# Add legend
ax.legend(loc="lower left", frameon=True, fontsize=11)
# Add grid for better readability
ax.grid(axis="both", alpha=0.3, linestyle="--")
ax.set_axisbelow(True)
# Adjust layout
plt.tight_layout()
# Save the figure
plt.savefig(output_filename, dpi=300, bbox_inches="tight")
print(f"Saved plot to {output_filename}")
# Display the plot
plt.show()

View File

@@ -0,0 +1,84 @@
import os
import sys
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scienceplots # noqa: F401
plt.style.use(["science"])
# Get CSV filename from command line argument
if len(sys.argv) < 2:
print("Usage: python script.py <csv_filename>")
sys.exit(1)
csv_filename = sys.argv[1]
# Get basename without extension for output filename
basename = os.path.splitext(csv_filename)[0]
output_filename = f"{basename}-write.png"
# Read data from CSV file
df = pd.read_csv(csv_filename)
# Filter for compute time = 0
df_filtered = df[df["compute"] == 0].sort_values("threads")
# Get unique systems and threads
systems = df_filtered["system"].unique()
threads = sorted(df_filtered["threads"].unique())
# Create figure and axis
fig, ax = plt.subplots(figsize=(10, 6))
# Set up bar positions
x_pos = np.arange(len(threads))
bar_width = 0.35
# Get colors from the current color cycle
prop_cycle = plt.rcParams["axes.prop_cycle"]
colors_list = prop_cycle.by_key()["color"]
# Plot bars for each system
for i, system in enumerate(systems):
system_data = df_filtered[df_filtered["system"] == system].sort_values("threads")
throughput = system_data["throughput"].tolist()
offset = (i - len(systems)/2 + 0.5) * bar_width
bars = ax.bar(x_pos + offset, throughput, bar_width,
label=system,
color=colors_list[i % len(colors_list)],
edgecolor="black", linewidth=1.2)
# Customize the plot
ax.set_xlabel("Number of Threads", fontsize=14, fontweight="bold")
ax.set_ylabel("Throughput (rows/sec)", fontsize=14, fontweight="bold")
# Set y-axis to start from 0 with dynamic upper limit
max_throughput = df_filtered["throughput"].max()
ax.set_ylim(0, max_throughput * 1.15) # Add 15% tolerance for legend space
# Format y-axis labels
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"{int(x/1000)}k"))
# Set x-axis ticks to show thread values
ax.set_xticks(x_pos)
ax.set_xticklabels(threads)
# Add legend
ax.legend(loc="upper left", frameon=True, fontsize=12)
# Add grid for better readability
ax.grid(axis="y", alpha=0.3, linestyle="--")
ax.set_axisbelow(True)
# Adjust layout
plt.tight_layout()
# Save the figure
plt.savefig(output_filename, dpi=300, bbox_inches="tight")
print(f"Saved plot to {output_filename}")
# Display the plot
plt.show()

View File

@@ -0,0 +1,11 @@
#!/bin/sh
cargo build --release
echo "system,threads,batch_size,compute,throughput"
for threads in 1 2 4 8; do
for compute in 0 100 500 1000; do
../../../target/release/write-throughput-sqlite --threads ${threads} --batch-size 100 --compute ${compute} -i 1000
done
done

View File

@@ -18,21 +18,16 @@ struct Args {
iterations: usize,
#[arg(
long = "think",
long = "compute",
default_value = "0",
help = "Per transaction think time (ms)"
help = "Per transaction compute time (us)"
)]
think: u64,
compute: u64,
}
fn main() -> Result<()> {
let args = Args::parse();
println!(
"Running write throughput benchmark with {} threads, {} batch size, {} iterations",
args.threads, args.batch_size, args.iterations
);
let db_path = "write_throughput_test.db";
if std::path::Path::new(db_path).exists() {
std::fs::remove_file(db_path).expect("Failed to remove existing database");
@@ -61,7 +56,7 @@ fn main() -> Result<()> {
args.batch_size,
args.iterations,
barrier,
args.think,
args.compute,
)
});
@@ -86,17 +81,9 @@ fn main() -> Result<()> {
let overall_elapsed = overall_start.elapsed();
let overall_throughput = (total_inserts as f64) / overall_elapsed.as_secs_f64();
println!("\n=== BENCHMARK RESULTS ===");
println!("Total inserts: {total_inserts}",);
println!("Total time: {:.2}s", overall_elapsed.as_secs_f64());
println!("Overall throughput: {overall_throughput:.2} inserts/sec");
println!("Threads: {}", args.threads);
println!("Batch size: {}", args.batch_size);
println!("Iterations per thread: {}", args.iterations);
println!(
"Database file exists: {}",
std::path::Path::new(db_path).exists()
"SQLite,{},{},{},{:.2}",
args.threads, args.batch_size, args.compute, overall_throughput
);
Ok(())
@@ -116,7 +103,6 @@ fn setup_database(db_path: &str) -> Result<Connection> {
[],
)?;
println!("Database created at: {db_path}");
Ok(conn)
}
@@ -126,7 +112,7 @@ fn worker_thread(
batch_size: usize,
iterations: usize,
start_barrier: Arc<Barrier>,
think_ms: u64,
compute_usec: u64,
) -> Result<u64> {
let conn = Connection::open(&db_path)?;
@@ -134,7 +120,6 @@ fn worker_thread(
start_barrier.wait();
let start_time = Instant::now();
let mut total_inserts = 0;
for iteration in 0..iterations {
@@ -142,27 +127,32 @@ fn worker_thread(
conn.execute("BEGIN", [])?;
let result = perform_compute(thread_id, compute_usec);
std::hint::black_box(result);
for i in 0..batch_size {
let id = thread_id * iterations * batch_size + iteration * batch_size + i;
stmt.execute([&id.to_string(), &format!("data_{id}")])?;
total_inserts += 1;
}
if think_ms > 0 {
thread::sleep(std::time::Duration::from_millis(think_ms));
}
conn.execute("COMMIT", [])?;
}
let elapsed = start_time.elapsed();
let throughput = (total_inserts as f64) / elapsed.as_secs_f64();
println!(
"Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)",
thread_id,
total_inserts,
elapsed.as_secs_f64(),
throughput
);
Ok(total_inserts)
}
// Busy loop to simulate CPU or GPU bound computation (for example, parsing,
// data aggregation or ML inference).
fn perform_compute(thread_id: usize, usec: u64) -> u64 {
if usec == 0 {
return 0;
}
let start = Instant::now();
let mut sum: u64 = 0;
while start.elapsed().as_micros() < usec as u128 {
sum = sum.wrapping_add(thread_id as u64);
}
sum
}

View File

@@ -0,0 +1,11 @@
#!/bin/sh
cargo build --release
echo "system,threads,batch_size,compute,throughput"
for threads in 1 2 4 8; do
for compute in 0 100 500 1000; do
../../../target/release/write-throughput --threads ${threads} --batch-size 100 --compute ${compute} -i 1000 --mode concurrent
done
done

View File

@@ -35,11 +35,11 @@ struct Args {
mode: TransactionMode,
#[arg(
long = "think",
long = "compute",
default_value = "0",
help = "Per transaction think time (ms)"
help = "Per transaction compute time (us)"
)]
think: u64,
compute: u64,
#[arg(
long = "timeout",
@@ -52,8 +52,7 @@ struct Args {
io: Option<IoOption>,
}
#[tokio::main]
async fn main() -> Result<()> {
fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_ansi(false)
@@ -61,11 +60,15 @@ async fn main() -> Result<()> {
.init();
let args = Args::parse();
println!(
"Running write throughput benchmark with {} threads, {} batch size, {} iterations, mode: {:?}",
args.threads, args.batch_size, args.iterations, args.mode
);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(args.threads)
.build()
.unwrap();
rt.block_on(async_main(args))
}
async fn async_main(args: Args) -> Result<()> {
let db_path = "write_throughput_test.db";
if std::path::Path::new(db_path).exists() {
std::fs::remove_file(db_path).expect("Failed to remove existing database");
@@ -95,7 +98,7 @@ async fn main() -> Result<()> {
args.iterations,
barrier,
args.mode,
args.think,
args.compute,
timeout,
));
@@ -120,21 +123,10 @@ async fn main() -> Result<()> {
let overall_elapsed = overall_start.elapsed();
let overall_throughput = (total_inserts as f64) / overall_elapsed.as_secs_f64();
println!("\n=== BENCHMARK RESULTS ===");
println!("Total inserts: {total_inserts}");
println!("Total time: {:.2}s", overall_elapsed.as_secs_f64());
println!("Overall throughput: {overall_throughput:.2} inserts/sec");
println!("Threads: {}", args.threads);
println!("Batch size: {}", args.batch_size);
println!("Iterations per thread: {}", args.iterations);
println!(
"Database file exists: {}",
std::path::Path::new(db_path).exists()
"Turso,{},{},{},{:.2}",
args.threads, args.batch_size, args.compute, overall_throughput
);
if let Ok(metadata) = std::fs::metadata(db_path) {
println!("Database file size: {} bytes", metadata.len());
}
Ok(())
}
@@ -171,7 +163,6 @@ async fn setup_database(
)
.await?;
println!("Database created at: {db_path}");
Ok(db)
}
@@ -183,12 +174,11 @@ async fn worker_thread(
iterations: usize,
start_barrier: Arc<Barrier>,
mode: TransactionMode,
think_ms: u64,
compute_usec: u64,
timeout: Duration,
) -> Result<u64> {
start_barrier.wait();
let start_time = Instant::now();
let total_inserts = Arc::new(AtomicU64::new(0));
let mut tx_futs = vec![];
@@ -208,6 +198,9 @@ async fn worker_thread(
};
conn.execute(begin_stmt, ()).await?;
let result = perform_compute(thread_id, compute_usec);
std::hint::black_box(result);
for i in 0..batch_size {
let id = thread_id * iterations * batch_size + iteration * batch_size + i;
stmt.execute(turso::params::Params::Positional(vec![
@@ -218,10 +211,6 @@ async fn worker_thread(
total_inserts.fetch_add(1, Ordering::Relaxed);
}
if think_ms > 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(think_ms)).await;
}
conn.execute("COMMIT", ()).await?;
Ok::<_, turso::Error>(())
};
@@ -236,17 +225,21 @@ async fn worker_thread(
result?;
}
let elapsed = start_time.elapsed();
let final_inserts = total_inserts.load(Ordering::Relaxed);
let throughput = (final_inserts as f64) / elapsed.as_secs_f64();
println!(
"Thread {}: {} inserts in {:.2}s ({:.2} inserts/sec)",
thread_id,
final_inserts,
elapsed.as_secs_f64(),
throughput
);
Ok(final_inserts)
}
// Busy loop to simulate CPU or GPU bound computation (for example, parsing,
// data aggregation or ML inference).
fn perform_compute(thread_id: usize, usec: u64) -> u64 {
if usec == 0 {
return 0;
}
let start = Instant::now();
let mut sum: u64 = 0;
while start.elapsed().as_micros() < usec as u128 {
sum = sum.wrapping_add(thread_id as u64);
}
sum
}