CSV import fixes

- Fix not being able to create table while importing
    * The behavior now aligns with SQLite so that if the table already
      exists, all the rows are treated as data. If the table doesn't exist,
      the first row is treated as the header from which column names for the
      new table are populated.
- Insert in batches instead of one at a time
This commit is contained in:
Jussi Saurio
2025-07-15 13:18:30 +03:00
parent beaf393476
commit cc47bfba02
4 changed files with 280 additions and 72 deletions

View File

@@ -40,44 +40,48 @@ impl<'a> ImportFile<'a> {
args.table
);
let table_exists = 'check: {
match self.conn.query(table_check_query) {
Ok(rows) => {
if let Some(mut rows) = rows {
let mut table_exists = false;
loop {
match rows.step() {
Ok(turso_core::StepResult::Row) => {
table_exists = true;
break;
break 'check true;
}
Ok(turso_core::StepResult::Done) => break,
Ok(turso_core::StepResult::Done) => break 'check false,
Ok(turso_core::StepResult::IO) => {
rows.run_once().unwrap();
if let Err(e) = rows.run_once() {
let _ = self.writer.write_all(
format!("Error checking table existence: {e:?}\n")
.as_bytes(),
);
return;
}
}
Ok(
turso_core::StepResult::Interrupt | turso_core::StepResult::Busy,
) => break,
turso_core::StepResult::Interrupt
| turso_core::StepResult::Busy,
) => {
if let Err(e) = rows.run_once() {
let _ = self.writer.write_all(
format!("Error checking table existence: {e:?}\n")
.as_bytes(),
);
return;
}
}
Err(e) => {
let _ = self.writer.write_all(
format!("Error checking table existence: {e:?}\n").as_bytes(),
format!("Error checking table existence: {e:?}\n")
.as_bytes(),
);
return;
}
}
}
if !table_exists {
let _ = self.writer.write_all(
format!("Error: no such table: {}\n", args.table).as_bytes(),
);
return;
}
} else {
let _ = self
.writer
.write_all(format!("Error: no such table: {}\n", args.table).as_bytes());
return;
}
false
}
Err(e) => {
let _ = self
@@ -86,6 +90,7 @@ impl<'a> ImportFile<'a> {
return;
}
}
};
let file = match File::open(args.file) {
Ok(file) => file,
@@ -102,49 +107,207 @@ impl<'a> ImportFile<'a> {
let mut success_rows = 0u64;
let mut failed_rows = 0u64;
for result in rdr.records().skip(args.skip as usize) {
let record = result.unwrap();
let mut records = rdr.records().skip(args.skip as usize).peekable();
if !record.is_empty() {
let mut values_string = String::new();
// If table doesn't exist, use first row as header to create table
if !table_exists {
if let Some(Ok(header)) = records.next() {
let columns = header
.iter()
.map(normalize_ident)
.collect::<Vec<_>>()
.join(", ");
let create_table = format!("CREATE TABLE {} ({});", args.table, columns);
for r in record.iter() {
values_string.push('\'');
// The string can have a single quote which needs to be escaped
values_string.push_str(&r.replace("'", "''"));
values_string.push_str("',");
let rows = match self.conn.query(create_table) {
Ok(rows) => rows,
Err(e) => {
let _ = self
.writer
.write_all(format!("Error creating table: {e:?}\n").as_bytes());
return;
}
};
let Some(mut rows) = rows else {
let _ = self.writer.write_all(b"Error creating table\n");
return;
};
loop {
match rows.step() {
Ok(turso_core::StepResult::IO) => {
if let Err(e) = rows.run_once() {
let _ = self
.writer
.write_all(format!("Error creating table: {e:?}\n").as_bytes());
return;
}
}
Ok(turso_core::StepResult::Interrupt)
| Ok(turso_core::StepResult::Busy) => {
let _ = self.writer.write_all(
"Error creating table: interrupted / busy\n"
.to_string()
.as_bytes(),
);
return;
}
Ok(turso_core::StepResult::Row) => {
// Not expected for CREATE TABLE
panic!("Unexpected row for CREATE TABLE");
}
Ok(turso_core::StepResult::Done) => break,
Err(e) => {
let _ = self
.writer
.write_all(format!("Error creating table: {e:?}\n").as_bytes());
return;
}
}
}
} else {
let _ = self.writer.write_all(b"Error: Empty input file\n");
return;
}
}
// remove the last comma after last element
values_string.pop();
/// TODO: should this be in a single transaction (i.e. all or nothing)?
const CSV_INSERT_BATCH_SIZE: usize = 1000;
let mut batch = Vec::with_capacity(CSV_INSERT_BATCH_SIZE);
for result in records {
let record = match result {
Ok(r) => r,
Err(e) => {
failed_rows += 1;
let _ = self
.writer
.write_all(format!("Error reading row: {e:?}\n").as_bytes());
continue;
}
};
if !record.is_empty() {
let values: Vec<String> = record
.iter()
.map(|r| format!("'{}'", r.replace("'", "''")))
.collect();
batch.push(values.join(","));
if batch.len() >= CSV_INSERT_BATCH_SIZE {
println!("Inserting batch of {} rows", batch.len());
let insert_string =
format!("INSERT INTO {} VALUES ({});", args.table, values_string);
format!("INSERT INTO {} VALUES ({});", args.table, batch.join("),("));
match self.conn.query(insert_string) {
Ok(rows) => {
if let Some(mut rows) = rows {
while let Ok(x) = rows.step() {
match x {
turso_core::StepResult::IO => {
rows.run_once().unwrap();
}
turso_core::StepResult::Done => break,
turso_core::StepResult::Interrupt => break,
turso_core::StepResult::Busy => {
let _ =
self.writer.write_all("database is busy\n".as_bytes());
loop {
match rows.step() {
Ok(turso_core::StepResult::IO) => {
if let Err(e) = rows.run_once() {
let _ = self.writer.write_all(
format!("Error executing query: {e:?}\n")
.as_bytes(),
);
failed_rows += batch.len() as u64;
break;
}
turso_core::StepResult::Row => todo!(),
}
Ok(turso_core::StepResult::Done) => {
success_rows += batch.len() as u64;
break;
}
Ok(turso_core::StepResult::Interrupt) => {
failed_rows += batch.len() as u64;
break;
}
Ok(turso_core::StepResult::Busy) => {
let _ = self.writer.write_all(b"database is busy\n");
failed_rows += batch.len() as u64;
break;
}
Ok(turso_core::StepResult::Row) => {
panic!("Unexpected row for INSERT");
}
Err(e) => {
let _ = self.writer.write_all(
format!("Error executing query: {e:?}\n")
.as_bytes(),
);
failed_rows += batch.len() as u64;
break;
}
}
}
success_rows += 1;
} else {
success_rows += batch.len() as u64;
}
Err(_err) => {
failed_rows += 1;
}
Err(e) => {
let _ = self
.writer
.write_all(format!("Error executing query: {e:?}\n").as_bytes());
failed_rows += batch.len() as u64;
}
}
batch.clear();
}
}
}
// Insert remaining records
if !batch.is_empty() {
let insert_string =
format!("INSERT INTO {} VALUES ({});", args.table, batch.join("),("));
match self.conn.query(insert_string) {
Ok(rows) => {
if let Some(mut rows) = rows {
loop {
match rows.step() {
Ok(turso_core::StepResult::IO) => {
if let Err(e) = rows.run_once() {
let _ = self.writer.write_all(
format!("Error executing query: {e:?}\n").as_bytes(),
);
failed_rows += batch.len() as u64;
break;
}
}
Ok(turso_core::StepResult::Done) => {
success_rows += batch.len() as u64;
break;
}
Ok(turso_core::StepResult::Interrupt) => {
failed_rows += batch.len() as u64;
break;
}
Ok(turso_core::StepResult::Busy) => {
let _ = self.writer.write_all(b"database is busy\n");
failed_rows += batch.len() as u64;
break;
}
Ok(turso_core::StepResult::Row) => {
panic!("Unexpected row for INSERT");
}
Err(e) => {
let _ = self.writer.write_all(
format!("Error executing query: {e:?}\n").as_bytes(),
);
failed_rows += batch.len() as u64;
break;
}
}
}
} else {
success_rows += batch.len() as u64;
}
}
Err(e) => {
let _ = self
.writer
.write_all(format!("Error executing query: {e:?}\n").as_bytes());
failed_rows += batch.len() as u64;
}
}
}
@@ -152,13 +315,29 @@ impl<'a> ImportFile<'a> {
if args.verbose {
let _ = self.writer.write_all(
format!(
"Added {} rows with {} errors using {} lines of input\n",
"Added {} rows with {} errors using {} lines of input",
success_rows,
failed_rows,
success_rows + failed_rows
success_rows + failed_rows,
)
.as_bytes(),
);
}
}
}
// https://sqlite.org/lang_keywords.html
const QUOTE_PAIRS: &[(char, char)] = &[('"', '"'), ('[', ']'), ('`', '`')];
pub fn normalize_ident(identifier: &str) -> String {
let quote_pair = QUOTE_PAIRS
.iter()
.find(|&(start, end)| identifier.starts_with(*start) && identifier.ends_with(*end));
if let Some(&(_, _)) = quote_pair {
&identifier[1..identifier.len() - 1]
} else {
identifier
}
.to_lowercase()
}

View File

@@ -1,2 +1,3 @@
mydb*
hits.csv
hits_small.csv

View File

@@ -228,6 +228,30 @@ def test_import_csv_skip():
shell.quit()
def test_import_csv_create_table_from_header():
shell = TestTursoShell()
shell.run_test("open-memory", ".open :memory:", "")
# Import CSV with header - should create table automatically
shell.run_test(
"import-csv-create-table",
".import --csv ./testing/test_files/test_w_header.csv auto_table",
"",
)
# Verify table was created with correct column names
shell.run_test(
"verify-auto-table-schema",
".schema auto_table",
"CREATE TABLE auto_table (id, interesting_number, interesting_string);",
)
# Verify data was imported correctly (header row excluded)
shell.run_test(
"verify-auto-table-data",
"select * from auto_table;",
"1|2.0|String'1\n3|4.0|String2",
)
shell.quit()
def test_table_patterns():
shell = TestTursoShell()
shell.run_test("tables-pattern", ".tables us%", "users")
@@ -304,6 +328,7 @@ def main():
test_import_csv()
test_import_csv_verbose()
test_import_csv_skip()
test_import_csv_create_table_from_header()
test_table_patterns()
test_update_with_limit()
test_update_with_limit_and_offset()

View File

@@ -0,0 +1,3 @@
"id","interesting_number","interesting_string"
1,2.0,"String'1"
3,4.0,"String2"
1 id interesting_number interesting_string
2 1 2.0 String'1
3 3 4.0 String2