diff --git a/cli/commands/import.rs b/cli/commands/import.rs index 209cf4375..8ec0f8537 100644 --- a/cli/commands/import.rs +++ b/cli/commands/import.rs @@ -40,52 +40,57 @@ impl<'a> ImportFile<'a> { args.table ); - match self.conn.query(table_check_query) { - Ok(rows) => { - if let Some(mut rows) = rows { - let mut table_exists = false; - loop { - match rows.step() { - Ok(turso_core::StepResult::Row) => { - table_exists = true; - break; - } - Ok(turso_core::StepResult::Done) => break, - Ok(turso_core::StepResult::IO) => { - rows.run_once().unwrap(); - } - Ok( - turso_core::StepResult::Interrupt | turso_core::StepResult::Busy, - ) => break, - Err(e) => { - let _ = self.writer.write_all( - format!("Error checking table existence: {e:?}\n").as_bytes(), - ); - return; + let table_exists = 'check: { + match self.conn.query(table_check_query) { + Ok(rows) => { + if let Some(mut rows) = rows { + loop { + match rows.step() { + Ok(turso_core::StepResult::Row) => { + break 'check true; + } + Ok(turso_core::StepResult::Done) => break 'check false, + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error checking table existence: {e:?}\n") + .as_bytes(), + ); + return; + } + } + Ok( + turso_core::StepResult::Interrupt + | turso_core::StepResult::Busy, + ) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error checking table existence: {e:?}\n") + .as_bytes(), + ); + return; + } + } + Err(e) => { + let _ = self.writer.write_all( + format!("Error checking table existence: {e:?}\n") + .as_bytes(), + ); + return; + } } } } - - if !table_exists { - let _ = self.writer.write_all( - format!("Error: no such table: {}\n", args.table).as_bytes(), - ); - return; - } - } else { + false + } + Err(e) => { let _ = self .writer - .write_all(format!("Error: no such table: {}\n", args.table).as_bytes()); + .write_all(format!("Error checking table existence: {e:?}\n").as_bytes()); return; } } - Err(e) => { - let _ = self - .writer - .write_all(format!("Error checking table existence: {e:?}\n").as_bytes()); - return; - } - } + }; let file = match File::open(args.file) { Ok(file) => file, @@ -102,63 +107,237 @@ impl<'a> ImportFile<'a> { let mut success_rows = 0u64; let mut failed_rows = 0u64; - for result in rdr.records().skip(args.skip as usize) { - let record = result.unwrap(); + let mut records = rdr.records().skip(args.skip as usize).peekable(); + + // If table doesn't exist, use first row as header to create table + if !table_exists { + if let Some(Ok(header)) = records.next() { + let columns = header + .iter() + .map(normalize_ident) + .collect::>() + .join(", "); + let create_table = format!("CREATE TABLE {} ({});", args.table, columns); + + let rows = match self.conn.query(create_table) { + Ok(rows) => rows, + Err(e) => { + let _ = self + .writer + .write_all(format!("Error creating table: {e:?}\n").as_bytes()); + return; + } + }; + let Some(mut rows) = rows else { + let _ = self.writer.write_all(b"Error creating table\n"); + return; + }; + + loop { + match rows.step() { + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self + .writer + .write_all(format!("Error creating table: {e:?}\n").as_bytes()); + return; + } + } + Ok(turso_core::StepResult::Interrupt) + | Ok(turso_core::StepResult::Busy) => { + let _ = self.writer.write_all( + "Error creating table: interrupted / busy\n" + .to_string() + .as_bytes(), + ); + return; + } + Ok(turso_core::StepResult::Row) => { + // Not expected for CREATE TABLE + panic!("Unexpected row for CREATE TABLE"); + } + Ok(turso_core::StepResult::Done) => break, + Err(e) => { + let _ = self + .writer + .write_all(format!("Error creating table: {e:?}\n").as_bytes()); + return; + } + } + } + } else { + let _ = self.writer.write_all(b"Error: Empty input file\n"); + return; + } + } + + /// TODO: should this be in a single transaction (i.e. all or nothing)? + const CSV_INSERT_BATCH_SIZE: usize = 1000; + let mut batch = Vec::with_capacity(CSV_INSERT_BATCH_SIZE); + for result in records { + let record = match result { + Ok(r) => r, + Err(e) => { + failed_rows += 1; + let _ = self + .writer + .write_all(format!("Error reading row: {e:?}\n").as_bytes()); + continue; + } + }; if !record.is_empty() { - let mut values_string = String::new(); + let values: Vec = record + .iter() + .map(|r| format!("'{}'", r.replace("'", "''"))) + .collect(); + batch.push(values.join(",")); - for r in record.iter() { - values_string.push('\''); - // The string can have a single quote which needs to be escaped - values_string.push_str(&r.replace("'", "''")); - values_string.push_str("',"); - } + if batch.len() >= CSV_INSERT_BATCH_SIZE { + println!("Inserting batch of {} rows", batch.len()); + let insert_string = + format!("INSERT INTO {} VALUES ({});", args.table, batch.join("),(")); - // remove the last comma after last element - values_string.pop(); - - let insert_string = - format!("INSERT INTO {} VALUES ({});", args.table, values_string); - - match self.conn.query(insert_string) { - Ok(rows) => { - if let Some(mut rows) = rows { - while let Ok(x) = rows.step() { - match x { - turso_core::StepResult::IO => { - rows.run_once().unwrap(); + match self.conn.query(insert_string) { + Ok(rows) => { + if let Some(mut rows) = rows { + loop { + match rows.step() { + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n") + .as_bytes(), + ); + failed_rows += batch.len() as u64; + break; + } + } + Ok(turso_core::StepResult::Done) => { + success_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Interrupt) => { + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Busy) => { + let _ = self.writer.write_all(b"database is busy\n"); + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Row) => { + panic!("Unexpected row for INSERT"); + } + Err(e) => { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n") + .as_bytes(), + ); + failed_rows += batch.len() as u64; + break; + } } - turso_core::StepResult::Done => break, - turso_core::StepResult::Interrupt => break, - turso_core::StepResult::Busy => { - let _ = - self.writer.write_all("database is busy\n".as_bytes()); + } + } else { + success_rows += batch.len() as u64; + } + } + Err(e) => { + let _ = self + .writer + .write_all(format!("Error executing query: {e:?}\n").as_bytes()); + failed_rows += batch.len() as u64; + } + } + batch.clear(); + } + } + } + + // Insert remaining records + if !batch.is_empty() { + let insert_string = + format!("INSERT INTO {} VALUES ({});", args.table, batch.join("),(")); + + match self.conn.query(insert_string) { + Ok(rows) => { + if let Some(mut rows) = rows { + loop { + match rows.step() { + Ok(turso_core::StepResult::IO) => { + if let Err(e) = rows.run_once() { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n").as_bytes(), + ); + failed_rows += batch.len() as u64; break; } - turso_core::StepResult::Row => todo!(), + } + Ok(turso_core::StepResult::Done) => { + success_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Interrupt) => { + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Busy) => { + let _ = self.writer.write_all(b"database is busy\n"); + failed_rows += batch.len() as u64; + break; + } + Ok(turso_core::StepResult::Row) => { + panic!("Unexpected row for INSERT"); + } + Err(e) => { + let _ = self.writer.write_all( + format!("Error executing query: {e:?}\n").as_bytes(), + ); + failed_rows += batch.len() as u64; + break; } } } - success_rows += 1; - } - Err(_err) => { - failed_rows += 1; + } else { + success_rows += batch.len() as u64; } } + Err(e) => { + let _ = self + .writer + .write_all(format!("Error executing query: {e:?}\n").as_bytes()); + failed_rows += batch.len() as u64; + } } } if args.verbose { let _ = self.writer.write_all( format!( - "Added {} rows with {} errors using {} lines of input\n", + "Added {} rows with {} errors using {} lines of input", success_rows, failed_rows, - success_rows + failed_rows + success_rows + failed_rows, ) .as_bytes(), ); } } } + +// https://sqlite.org/lang_keywords.html +const QUOTE_PAIRS: &[(char, char)] = &[('"', '"'), ('[', ']'), ('`', '`')]; + +pub fn normalize_ident(identifier: &str) -> String { + let quote_pair = QUOTE_PAIRS + .iter() + .find(|&(start, end)| identifier.starts_with(*start) && identifier.ends_with(*end)); + + if let Some(&(_, _)) = quote_pair { + &identifier[1..identifier.len() - 1] + } else { + identifier + } + .to_lowercase() +} diff --git a/perf/clickbench/.gitignore b/perf/clickbench/.gitignore index e5d9ddc15..5a3a490c4 100644 --- a/perf/clickbench/.gitignore +++ b/perf/clickbench/.gitignore @@ -1,2 +1,3 @@ mydb* hits.csv +hits_small.csv \ No newline at end of file diff --git a/testing/cli_tests/cli_test_cases.py b/testing/cli_tests/cli_test_cases.py index 9795fe68a..f0812d59f 100755 --- a/testing/cli_tests/cli_test_cases.py +++ b/testing/cli_tests/cli_test_cases.py @@ -228,6 +228,30 @@ def test_import_csv_skip(): shell.quit() +def test_import_csv_create_table_from_header(): + shell = TestTursoShell() + shell.run_test("open-memory", ".open :memory:", "") + # Import CSV with header - should create table automatically + shell.run_test( + "import-csv-create-table", + ".import --csv ./testing/test_files/test_w_header.csv auto_table", + "", + ) + # Verify table was created with correct column names + shell.run_test( + "verify-auto-table-schema", + ".schema auto_table", + "CREATE TABLE auto_table (id, interesting_number, interesting_string);", + ) + # Verify data was imported correctly (header row excluded) + shell.run_test( + "verify-auto-table-data", + "select * from auto_table;", + "1|2.0|String'1\n3|4.0|String2", + ) + shell.quit() + + def test_table_patterns(): shell = TestTursoShell() shell.run_test("tables-pattern", ".tables us%", "users") @@ -304,6 +328,7 @@ def main(): test_import_csv() test_import_csv_verbose() test_import_csv_skip() + test_import_csv_create_table_from_header() test_table_patterns() test_update_with_limit() test_update_with_limit_and_offset() diff --git a/testing/test_files/test_w_header.csv b/testing/test_files/test_w_header.csv new file mode 100644 index 000000000..564243429 --- /dev/null +++ b/testing/test_files/test_w_header.csv @@ -0,0 +1,3 @@ +"id","interesting_number","interesting_string" +1,2.0,"String'1" +3,4.0,"String2" \ No newline at end of file