From cc47bfba02cdd6992bc484742d28315ae15e0297 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 15 Jul 2025 13:18:30 +0300 Subject: [PATCH] CSV import fixes - Fix not being able to create table while importing * The behavior now aligns with SQLite so that if the table already exists, all the rows are treated as data. If the table doesn't exist, the first row is treated as the header from which column names for the new table are populated. - Insert in batches instead of one at a time --- cli/commands/import.rs | 323 +++++++++++++++++++++------ perf/clickbench/.gitignore | 1 + testing/cli_tests/cli_test_cases.py | 25 +++ testing/test_files/test_w_header.csv | 3 + 4 files changed, 280 insertions(+), 72 deletions(-) create mode 100644 testing/test_files/test_w_header.csv diff --git a/cli/commands/import.rs b/cli/commands/import.rs index 209cf4375..8ec0f8537 100644 --- a/cli/commands/import.rs +++ b/cli/commands/import.rs @@ -40,52 +40,57 @@ impl<'a> ImportFile<'a> { args.table ); - match self.conn.query(table_check_query) { - Ok(rows) => { - if let Some(mut rows) = rows { - let mut table_exists = false; - loop { - match rows.step() { - Ok(turso_core::StepResult::Row) => { - table_exists = true; - break; - } - Ok(turso_core::StepResult::Done) => break, - Ok(turso_core::StepResult::IO) => { - rows.run_once().unwrap(); - } - Ok( - turso_core::StepResult::Interrupt | turso_core::StepResult::Busy, - ) => break, - Err(e) => { - let _ = self.writer.write_all( - format!("Error checking table existence: {e:?}\n").as_bytes(), - ); - return; + let table_exists = 'check: { + match self.conn.query(table_check_query) { + Ok(rows) => { + if let Some(mut rows) = rows { + loop { + match rows.step() { + Ok(turso_core::StepResult::Row) => { + break 'check true; + } + Ok(turso_core::StepResult::Done) => break 'check false, + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error checking table existence: {e:?}\n") + .as_bytes(), + ); + return; + } + } + Ok( + turso_core::StepResult::Interrupt + | turso_core::StepResult::Busy, + ) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error checking table existence: {e:?}\n") + .as_bytes(), + ); + return; + } + } + Err(e) => { + let _ = self.writer.write_all( + format!("Error checking table existence: {e:?}\n") + .as_bytes(), + ); + return; + } } } } - - if !table_exists { - let _ = self.writer.write_all( - format!("Error: no such table: {}\n", args.table).as_bytes(), - ); - return; - } - } else { + false + } + Err(e) => { let _ = self .writer - .write_all(format!("Error: no such table: {}\n", args.table).as_bytes()); + .write_all(format!("Error checking table existence: {e:?}\n").as_bytes()); return; } } - Err(e) => { - let _ = self - .writer - .write_all(format!("Error checking table existence: {e:?}\n").as_bytes()); - return; - } - } + }; let file = match File::open(args.file) { Ok(file) => file, @@ -102,63 +107,237 @@ impl<'a> ImportFile<'a> { let mut success_rows = 0u64; let mut failed_rows = 0u64; - for result in rdr.records().skip(args.skip as usize) { - let record = result.unwrap(); + let mut records = rdr.records().skip(args.skip as usize).peekable(); + + // If table doesn't exist, use first row as header to create table + if !table_exists { + if let Some(Ok(header)) = records.next() { + let columns = header + .iter() + .map(normalize_ident) + .collect::>() + .join(", "); + let create_table = format!("CREATE TABLE {} ({});", args.table, columns); + + let rows = match self.conn.query(create_table) { + Ok(rows) => rows, + Err(e) => { + let _ = self + .writer + .write_all(format!("Error creating table: {e:?}\n").as_bytes()); + return; + } + }; + let Some(mut rows) = rows else { + let _ = self.writer.write_all(b"Error creating table\n"); + return; + }; + + loop { + match rows.step() { + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self + .writer + .write_all(format!("Error creating table: {e:?}\n").as_bytes()); + return; + } + } + Ok(turso_core::StepResult::Interrupt) + | Ok(turso_core::StepResult::Busy) => { + let _ = self.writer.write_all( + "Error creating table: interrupted / busy\n" + .to_string() + .as_bytes(), + ); + return; + } + Ok(turso_core::StepResult::Row) => { + // Not expected for CREATE TABLE + panic!("Unexpected row for CREATE TABLE"); + } + Ok(turso_core::StepResult::Done) => break, + Err(e) => { + let _ = self + .writer + .write_all(format!("Error creating table: {e:?}\n").as_bytes()); + return; + } + } + } + } else { + let _ = self.writer.write_all(b"Error: Empty input file\n"); + return; + } + } + + /// TODO: should this be in a single transaction (i.e. all or nothing)? + const CSV_INSERT_BATCH_SIZE: usize = 1000; + let mut batch = Vec::with_capacity(CSV_INSERT_BATCH_SIZE); + for result in records { + let record = match result { + Ok(r) => r, + Err(e) => { + failed_rows += 1; + let _ = self + .writer + .write_all(format!("Error reading row: {e:?}\n").as_bytes()); + continue; + } + }; if !record.is_empty() { - let mut values_string = String::new(); + let values: Vec = record + .iter() + .map(|r| format!("'{}'", r.replace("'", "''"))) + .collect(); + batch.push(values.join(",")); - for r in record.iter() { - values_string.push('\''); - // The string can have a single quote which needs to be escaped - values_string.push_str(&r.replace("'", "''")); - values_string.push_str("',"); - } + if batch.len() >= CSV_INSERT_BATCH_SIZE { + println!("Inserting batch of {} rows", batch.len()); + let insert_string = + format!("INSERT INTO {} VALUES ({});", args.table, batch.join("),(")); - // remove the last comma after last element - values_string.pop(); - - let insert_string = - format!("INSERT INTO {} VALUES ({});", args.table, values_string); - - match self.conn.query(insert_string) { - Ok(rows) => { - if let Some(mut rows) = rows { - while let Ok(x) = rows.step() { - match x { - turso_core::StepResult::IO => { - rows.run_once().unwrap(); + match self.conn.query(insert_string) { + Ok(rows) => { + if let Some(mut rows) = rows { + loop { + match rows.step() { + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n") + .as_bytes(), + ); + failed_rows += batch.len() as u64; + break; + } + } + Ok(turso_core::StepResult::Done) => { + success_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Interrupt) => { + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Busy) => { + let _ = self.writer.write_all(b"database is busy\n"); + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Row) => { + panic!("Unexpected row for INSERT"); + } + Err(e) => { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n") + .as_bytes(), + ); + failed_rows += batch.len() as u64; + break; + } } - turso_core::StepResult::Done => break, - turso_core::StepResult::Interrupt => break, - turso_core::StepResult::Busy => { - let _ = - self.writer.write_all("database is busy\n".as_bytes()); + } + } else { + success_rows += batch.len() as u64; + } + } + Err(e) => { + let _ = self + .writer + .write_all(format!("Error executing query: {e:?}\n").as_bytes()); + failed_rows += batch.len() as u64; + } + } + batch.clear(); + } + } + } + + // Insert remaining records + if !batch.is_empty() { + let insert_string = + format!("INSERT INTO {} VALUES ({});", args.table, batch.join("),(")); + + match self.conn.query(insert_string) { + Ok(rows) => { + if let Some(mut rows) = rows { + loop { + match rows.step() { + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n").as_bytes(), + ); + failed_rows += batch.len() as u64; break; } - turso_core::StepResult::Row => todo!(), + } + Ok(turso_core::StepResult::Done) => { + success_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Interrupt) => { + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Busy) => { + let _ = self.writer.write_all(b"database is busy\n"); + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Row) => { + panic!("Unexpected row for INSERT"); + } + Err(e) => { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n").as_bytes(), + ); + failed_rows += batch.len() as u64; + break; } } } - success_rows += 1; - } - Err(_err) => { - failed_rows += 1; + } else { + success_rows += batch.len() as u64; } } + Err(e) => { + let _ = self + .writer + .write_all(format!("Error executing query: {e:?}\n").as_bytes()); + failed_rows += batch.len() as u64; + } } } if args.verbose { let _ = self.writer.write_all( format!( - "Added {} rows with {} errors using {} lines of input\n", + "Added {} rows with {} errors using {} lines of input", success_rows, failed_rows, - success_rows + failed_rows + success_rows + failed_rows, ) .as_bytes(), ); } } } + +// https://sqlite.org/lang_keywords.html +const QUOTE_PAIRS: &[(char, char)] = &[('"', '"'), ('[', ']'), ('`', '`')]; + +pub fn normalize_ident(identifier: &str) -> String { + let quote_pair = QUOTE_PAIRS + .iter() + .find(|&(start, end)| identifier.starts_with(*start) && identifier.ends_with(*end)); + + if let Some(&(_, _)) = quote_pair { + &identifier[1..identifier.len() - 1] + } else { + identifier + } + .to_lowercase() +} diff --git a/perf/clickbench/.gitignore b/perf/clickbench/.gitignore index e5d9ddc15..5a3a490c4 100644 --- a/perf/clickbench/.gitignore +++ b/perf/clickbench/.gitignore @@ -1,2 +1,3 @@ mydb* hits.csv +hits_small.csv \ No newline at end of file diff --git a/testing/cli_tests/cli_test_cases.py b/testing/cli_tests/cli_test_cases.py index 9795fe68a..f0812d59f 100755 --- a/testing/cli_tests/cli_test_cases.py +++ b/testing/cli_tests/cli_test_cases.py @@ -228,6 +228,30 @@ def test_import_csv_skip(): shell.quit() +def test_import_csv_create_table_from_header(): + shell = TestTursoShell() + shell.run_test("open-memory", ".open :memory:", "") + # Import CSV with header - should create table automatically + shell.run_test( + "import-csv-create-table", + ".import --csv ./testing/test_files/test_w_header.csv auto_table", + "", + ) + # Verify table was created with correct column names + shell.run_test( + "verify-auto-table-schema", + ".schema auto_table", + "CREATE TABLE auto_table (id, interesting_number, interesting_string);", + ) + # Verify data was imported correctly (header row excluded) + shell.run_test( + "verify-auto-table-data", + "select * from auto_table;", + "1|2.0|String'1\n3|4.0|String2", + ) + shell.quit() + + def test_table_patterns(): shell = TestTursoShell() shell.run_test("tables-pattern", ".tables us%", "users") @@ -304,6 +328,7 @@ def main(): test_import_csv() test_import_csv_verbose() test_import_csv_skip() + test_import_csv_create_table_from_header() test_table_patterns() test_update_with_limit() test_update_with_limit_and_offset() diff --git a/testing/test_files/test_w_header.csv b/testing/test_files/test_w_header.csv new file mode 100644 index 000000000..564243429 --- /dev/null +++ b/testing/test_files/test_w_header.csv @@ -0,0 +1,3 @@ +"id","interesting_number","interesting_string" +1,2.0,"String'1" +3,4.0,"String2" \ No newline at end of file