diff --git a/core/lib.rs b/core/lib.rs index d5b21bd3d..1ffff3f46 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -564,6 +564,9 @@ impl CaptureDataChangesMode { )) } } + pub fn has_updates(&self) -> bool { + matches!(self, CaptureDataChangesMode::Full { .. }) + } pub fn has_after(&self) -> bool { matches!( self, diff --git a/core/translate/alter.rs b/core/translate/alter.rs index fb95ec044..5d93c7492 100644 --- a/core/translate/alter.rs +++ b/core/translate/alter.rs @@ -13,7 +13,7 @@ use crate::{ LimboError, Result, SymbolTable, }; -use super::{schema::SQLITE_TABLEID, update::translate_update_with_after}; +use super::{schema::SQLITE_TABLEID, update::translate_update_for_schema_change}; pub fn translate_alter_table( alter: (ast::QualifiedName, ast::AlterTableBody), @@ -21,6 +21,7 @@ pub fn translate_alter_table( schema: &Schema, mut program: ProgramBuilder, connection: &Arc, + input: &str, ) -> Result { program.begin_write_operation(); let (table_name, alter_table) = alter; @@ -94,12 +95,13 @@ pub fn translate_alter_table( unreachable!(); }; - translate_update_with_after( + translate_update_for_schema_change( schema, &mut update, syms, program, connection, + input, |program| { let column_count = btree.columns.len(); let root_page = btree.root_page; @@ -206,12 +208,13 @@ pub fn translate_alter_table( unreachable!(); }; - translate_update_with_after( + translate_update_for_schema_change( schema, &mut update, syms, program, connection, + input, |program| { program.emit_insn(Insn::SetCookie { db: 0, diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index e9b72735b..60b7bc7b8 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -601,6 +601,7 @@ fn emit_delete_insns( rowid_reg, before_record_reg, None, + None, table_reference.table.get_name(), )?; } @@ -868,6 +869,15 @@ fn emit_update_insns( // we scan a column at a time, loading either the column's values, or the new value // from the Set expression, into registers so we can emit a MakeRecord and update the row. + + // we allocate 2C registers for "updates" as the structure of this column for CDC table is following: + // [C boolean values where true set for changed columns] [C values with updates where NULL is set for not-changed columns] + let cdc_updates_register = if program.capture_data_changes_mode().has_updates() { + Some(program.alloc_registers(2 * table_ref.columns().len())) + } else { + None + }; + let start = if is_virtual { beg + 2 } else { beg + 1 }; for (idx, table_column) in table_ref.columns().iter().enumerate() { let target_reg = start + idx; @@ -914,6 +924,27 @@ fn emit_update_insns( }); } } + + if let Some(cdc_updates_register) = cdc_updates_register { + let change_reg = cdc_updates_register + idx; + let value_reg = cdc_updates_register + table_ref.columns().len() + idx; + program.emit_bool(true, change_reg); + program.mark_last_insn_constant(); + let mut updated = false; + if let Some(ddl_query_for_cdc_update) = &plan.cdc_update_alter_statement { + if table_column.name.as_deref() == Some("sql") { + program.emit_string8(ddl_query_for_cdc_update.clone(), value_reg); + updated = true; + } + } + if !updated { + program.emit_insn(Insn::Copy { + src_reg: target_reg, + dst_reg: value_reg, + extra_amount: 0, + }); + } + } } else { let column_idx_in_index = index.as_ref().and_then(|(idx, _)| { idx.columns @@ -944,6 +975,15 @@ fn emit_update_insns( .unwrap_or(&cursor_id); program.emit_column(cursor_id, column_idx_in_index.unwrap_or(idx), target_reg); } + + if let Some(cdc_updates_register) = cdc_updates_register { + let change_bit_reg = cdc_updates_register + idx; + let value_reg = cdc_updates_register + table_ref.columns().len() + idx; + program.emit_bool(false, change_bit_reg); + program.mark_last_insn_constant(); + program.emit_null(value_reg, None); + program.mark_last_insn_constant(); + } } } @@ -1222,6 +1262,19 @@ fn emit_update_insns( None }; + let cdc_updates_record = if let Some(cdc_updates_register) = cdc_updates_register { + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: cdc_updates_register, + count: 2 * table_ref.columns().len(), + dest_reg: record_reg, + index_name: None, + }); + Some(record_reg) + } else { + None + }; + // emit actual CDC instructions for write to the CDC table if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { let cdc_rowid_before_reg = @@ -1235,6 +1288,7 @@ fn emit_update_insns( cdc_rowid_before_reg, cdc_before_reg, None, + None, table_ref.table.get_name(), )?; emit_cdc_insns( @@ -1245,6 +1299,7 @@ fn emit_update_insns( cdc_rowid_after_reg, cdc_after_reg, None, + None, table_ref.table.get_name(), )?; } else { @@ -1256,6 +1311,7 @@ fn emit_update_insns( cdc_rowid_before_reg, cdc_before_reg, cdc_after_reg, + cdc_updates_record, table_ref.table.get_name(), )?; } @@ -1377,10 +1433,11 @@ pub fn emit_cdc_insns( rowid_reg: usize, before_record_reg: Option, after_record_reg: Option, + updates_record_reg: Option, table_name: &str, ) -> Result<()> { - // (change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, change_type INTEGER, table_name TEXT, id, before BLOB, after BLOB) - let turso_cdc_registers = program.alloc_registers(7); + // (change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, change_type INTEGER, table_name TEXT, id, before BLOB, after BLOB, updates BLOB) + let turso_cdc_registers = program.alloc_registers(8); program.emit_insn(Insn::Null { dest: turso_cdc_registers, dest_end: None, @@ -1441,6 +1498,17 @@ pub fn emit_cdc_insns( program.mark_last_insn_constant(); } + if let Some(updates_record_reg) = updates_record_reg { + program.emit_insn(Insn::Copy { + src_reg: updates_record_reg, + dst_reg: turso_cdc_registers + 7, + extra_amount: 0, + }); + } else { + program.emit_null(turso_cdc_registers + 7, None); + program.mark_last_insn_constant(); + } + let rowid_reg = program.alloc_register(); program.emit_insn(Insn::NewRowid { cursor: cdc_cursor_id, @@ -1451,7 +1519,7 @@ pub fn emit_cdc_insns( let record_reg = program.alloc_register(); program.emit_insn(Insn::MakeRecord { start_reg: turso_cdc_registers, - count: 7, + count: 8, dest_reg: record_reg, index_name: None, }); diff --git a/core/translate/index.rs b/core/translate/index.rs index 0a6175f46..03f5ef662 100644 --- a/core/translate/index.rs +++ b/core/translate/index.rs @@ -456,6 +456,7 @@ pub fn translate_drop_index( row_id_reg, before_record_reg, None, + None, SQLITE_TABLEID, )?; } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index eabf054c9..9cc051354 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -571,6 +571,7 @@ pub fn translate_insert( rowid_and_columns_start_register, None, after_record_reg, + None, table_name.as_str(), )?; } diff --git a/core/translate/mod.rs b/core/translate/mod.rs index 8b47760ce..36359c657 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -98,7 +98,7 @@ pub fn translate( connection.clone(), program, )?, - stmt => translate_inner(schema, stmt, syms, program, &connection)?, + stmt => translate_inner(schema, stmt, syms, program, &connection, input)?, }; program.epilogue(schema); @@ -115,6 +115,7 @@ pub fn translate_inner( syms: &SymbolTable, program: ProgramBuilder, connection: &Arc, + input: &str, ) -> Result { let is_write = matches!( stmt, @@ -140,7 +141,7 @@ pub fn translate_inner( let mut program = match stmt { ast::Stmt::AlterTable(alter) => { - translate_alter_table(*alter, syms, schema, program, connection)? + translate_alter_table(*alter, syms, schema, program, connection, input)? } ast::Stmt::Analyze(_) => bail_parse_error!("ANALYZE not supported yet"), ast::Stmt::Attach { expr, db_name, key } => { diff --git a/core/translate/plan.rs b/core/translate/plan.rs index 12a180bff..a8619317f 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -397,6 +397,9 @@ pub struct UpdatePlan { pub indexes_to_update: Vec>, // If the table's rowid alias is used, gather all the target rowids into an ephemeral table, and then use that table as the single JoinedTable for the actual UPDATE loop. pub ephemeral_plan: Option, + // For ALTER TABLE turso-db emits appropriate DDL statement in the "updates" cell of CDC table + // This field is present only for update plan created for ALTER TABLE when CDC mode has "updates" values + pub cdc_update_alter_statement: Option, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 114a66bb1..5ad994633 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -692,6 +692,14 @@ fn turso_cdc_table_columns() -> Vec { }), constraints: vec![], }, + ast::ColumnDefinition { + col_name: ast::Name::from_str("updates"), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, ] } diff --git a/core/translate/schema.rs b/core/translate/schema.rs index 786e2684f..c61f1008d 100644 --- a/core/translate/schema.rs +++ b/core/translate/schema.rs @@ -268,6 +268,7 @@ pub fn emit_schema_entry( rowid_reg, None, after_record_reg, + None, SQLITE_TABLEID, )?; } @@ -773,6 +774,7 @@ pub fn translate_drop_table( row_id_reg, before_record_reg, None, + None, SQLITE_TABLEID, )?; program.resolve_label(skip_cdc_label, program.offset()); diff --git a/core/translate/update.rs b/core/translate/update.rs index 6f8df6cc3..296166c8e 100644 --- a/core/translate/update.rs +++ b/core/translate/update.rs @@ -72,15 +72,23 @@ pub fn translate_update( Ok(program) } -pub fn translate_update_with_after( +pub fn translate_update_for_schema_change( schema: &Schema, body: &mut Update, syms: &SymbolTable, mut program: ProgramBuilder, connection: &Arc, + ddl_query: &str, after: impl FnOnce(&mut ProgramBuilder), ) -> crate::Result { let mut plan = prepare_update_plan(&mut program, schema, body, connection)?; + + if let Plan::Update(plan) = &mut plan { + if program.capture_data_changes_mode().has_updates() { + plan.cdc_update_alter_statement = Some(ddl_query.to_string()); + } + } + optimize_plan(&mut plan, schema)?; // TODO: freestyling these numbers let opts = ProgramBuilderOpts { @@ -368,6 +376,7 @@ pub fn prepare_update_plan( contains_constant_false_condition: false, indexes_to_update, ephemeral_plan, + cdc_update_alter_statement: None, })) } diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index bdce23af6..cec81d11c 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -44,6 +44,7 @@ fn test_cdc_simple_id() { Value::Integer(10), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -53,6 +54,7 @@ fn test_cdc_simple_id() { Value::Integer(5), Value::Null, Value::Null, + Value::Null, ] ] ); @@ -99,6 +101,7 @@ fn test_cdc_simple_before() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -108,6 +111,7 @@ fn test_cdc_simple_before() { Value::Integer(3), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -117,6 +121,7 @@ fn test_cdc_simple_before() { Value::Integer(1), Value::Blob(record([Value::Integer(1), Value::Integer(2)])), Value::Null, + Value::Null, ], vec![ Value::Integer(4), @@ -126,6 +131,7 @@ fn test_cdc_simple_before() { Value::Integer(3), Value::Blob(record([Value::Integer(3), Value::Integer(4)])), Value::Null, + Value::Null, ], vec![ Value::Integer(5), @@ -135,6 +141,7 @@ fn test_cdc_simple_before() { Value::Integer(1), Value::Blob(record([Value::Integer(1), Value::Integer(3)])), Value::Null, + Value::Null, ] ] ); @@ -165,6 +172,7 @@ fn test_cdc_simple_after() { Value::Integer(1), Value::Null, Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + Value::Null, ], vec![ Value::Integer(2), @@ -174,6 +182,7 @@ fn test_cdc_simple_after() { Value::Integer(3), Value::Null, Value::Blob(record([Value::Integer(3), Value::Integer(4)])), + Value::Null, ], vec![ Value::Integer(3), @@ -183,6 +192,7 @@ fn test_cdc_simple_after() { Value::Integer(1), Value::Null, Value::Blob(record([Value::Integer(1), Value::Integer(3)])), + Value::Null, ], vec![ Value::Integer(4), @@ -192,6 +202,7 @@ fn test_cdc_simple_after() { Value::Integer(3), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(5), @@ -201,6 +212,7 @@ fn test_cdc_simple_after() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ] ] ); @@ -231,6 +243,7 @@ fn test_cdc_simple_full() { Value::Integer(1), Value::Null, Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + Value::Null, ], vec![ Value::Integer(2), @@ -240,6 +253,7 @@ fn test_cdc_simple_full() { Value::Integer(3), Value::Null, Value::Blob(record([Value::Integer(3), Value::Integer(4)])), + Value::Null, ], vec![ Value::Integer(3), @@ -249,6 +263,12 @@ fn test_cdc_simple_full() { Value::Integer(1), Value::Blob(record([Value::Integer(1), Value::Integer(2)])), Value::Blob(record([Value::Integer(1), Value::Integer(3)])), + Value::Blob(record([ + Value::Integer(0), + Value::Integer(1), + Value::Null, + Value::Integer(3) + ])), ], vec![ Value::Integer(4), @@ -258,6 +278,7 @@ fn test_cdc_simple_full() { Value::Integer(3), Value::Blob(record([Value::Integer(3), Value::Integer(4)])), Value::Null, + Value::Null, ], vec![ Value::Integer(5), @@ -267,6 +288,7 @@ fn test_cdc_simple_full() { Value::Integer(1), Value::Blob(record([Value::Integer(1), Value::Integer(3)])), Value::Null, + Value::Null, ] ] ); @@ -307,6 +329,7 @@ fn test_cdc_crud() { Value::Integer(20), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -316,6 +339,7 @@ fn test_cdc_crud() { Value::Integer(10), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -325,6 +349,7 @@ fn test_cdc_crud() { Value::Integer(5), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(4), @@ -334,6 +359,7 @@ fn test_cdc_crud() { Value::Integer(5), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(5), @@ -343,6 +369,7 @@ fn test_cdc_crud() { Value::Integer(10), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(6), @@ -352,6 +379,7 @@ fn test_cdc_crud() { Value::Integer(20), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(7), @@ -361,6 +389,7 @@ fn test_cdc_crud() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(8), @@ -370,6 +399,7 @@ fn test_cdc_crud() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(9), @@ -379,6 +409,7 @@ fn test_cdc_crud() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], ] ); @@ -422,6 +453,7 @@ fn test_cdc_failed_op() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -431,6 +463,7 @@ fn test_cdc_failed_op() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -440,6 +473,7 @@ fn test_cdc_failed_op() { Value::Integer(6), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(4), @@ -449,6 +483,7 @@ fn test_cdc_failed_op() { Value::Integer(7), Value::Null, Value::Null, + Value::Null, ], ] ); @@ -508,6 +543,7 @@ fn test_cdc_uncaptured_connection() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -517,6 +553,7 @@ fn test_cdc_uncaptured_connection() { Value::Integer(4), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -526,6 +563,7 @@ fn test_cdc_uncaptured_connection() { Value::Integer(6), Value::Null, Value::Null, + Value::Null, ], ] ); @@ -564,6 +602,7 @@ fn test_cdc_custom_table() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -573,6 +612,7 @@ fn test_cdc_custom_table() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], ] ); @@ -613,6 +653,7 @@ fn test_cdc_ignore_changes_in_cdc_table() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ],] ); } @@ -654,6 +695,7 @@ fn test_cdc_transaction() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(2), @@ -663,6 +705,7 @@ fn test_cdc_transaction() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -672,6 +715,7 @@ fn test_cdc_transaction() { Value::Integer(3), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(4), @@ -681,6 +725,7 @@ fn test_cdc_transaction() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(5), @@ -690,6 +735,7 @@ fn test_cdc_transaction() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], ] ); @@ -731,6 +777,7 @@ fn test_cdc_independent_connections() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ]] ); let rows = @@ -745,6 +792,7 @@ fn test_cdc_independent_connections() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ]] ); } @@ -796,6 +844,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { Value::Integer(2), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -805,6 +854,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ] ] ); @@ -821,6 +871,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { Value::Integer(4), Value::Null, Value::Null, + Value::Null, ], vec![ Value::Integer(3), @@ -830,6 +881,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { Value::Integer(1), Value::Null, Value::Null, + Value::Null, ] ] ); @@ -914,6 +966,7 @@ fn test_cdc_schema_changes() { "CREATE TABLE t (x, y, z UNIQUE, q, PRIMARY KEY (x, y))".to_string() ) ])), + Value::Null, ], vec![ Value::Integer(2), @@ -929,6 +982,7 @@ fn test_cdc_schema_changes() { Value::Integer(6), Value::Text("CREATE TABLE q (a, b, c)".to_string()) ])), + Value::Null, ], vec![ Value::Integer(3), @@ -944,6 +998,7 @@ fn test_cdc_schema_changes() { Value::Integer(7), Value::Text("CREATE INDEX t_q ON t (q)".to_string()) ])), + Value::Null, ], vec![ Value::Integer(4), @@ -959,6 +1014,7 @@ fn test_cdc_schema_changes() { Value::Integer(8), Value::Text("CREATE INDEX q_abc ON q (a, b, c)".to_string()) ])), + Value::Null, ], vec![ Value::Integer(5), @@ -976,6 +1032,7 @@ fn test_cdc_schema_changes() { ) ])), Value::Null, + Value::Null, ], vec![ Value::Integer(6), @@ -991,6 +1048,7 @@ fn test_cdc_schema_changes() { Value::Text("CREATE INDEX q_abc ON q (a, b, c)".to_string()) ])), Value::Null, + Value::Null, ] ] ); @@ -1027,6 +1085,7 @@ fn test_cdc_schema_changes_alter_table() { "CREATE TABLE t (x, y, z UNIQUE, q, PRIMARY KEY (x, y))".to_string() ) ])), + Value::Null, ], vec![ Value::Integer(2), @@ -1052,6 +1111,18 @@ fn test_cdc_schema_changes_alter_table() { "CREATE TABLE t (x PRIMARY KEY, y PRIMARY KEY, z UNIQUE)".to_string() ) ])), + Value::Blob(record([ + Value::Integer(0), + Value::Integer(0), + Value::Integer(0), + Value::Integer(0), + Value::Integer(1), + Value::Null, + Value::Null, + Value::Null, + Value::Null, + Value::Text("ALTER TABLE t DROP COLUMN q".to_string()) + ])), ], vec![ Value::Integer(3), @@ -1077,6 +1148,18 @@ fn test_cdc_schema_changes_alter_table() { "CREATE TABLE t (x PRIMARY KEY, y PRIMARY KEY, z UNIQUE, t)".to_string() ) ])), + Value::Blob(record([ + Value::Integer(0), + Value::Integer(0), + Value::Integer(0), + Value::Integer(0), + Value::Integer(1), + Value::Null, + Value::Null, + Value::Null, + Value::Null, + Value::Text("ALTER TABLE t ADD COLUMN t".to_string()) + ])), ], ] ); diff --git a/tests/integration/query_processing/test_multi_thread.rs b/tests/integration/query_processing/test_multi_thread.rs index c509fe0da..94b33712b 100644 --- a/tests/integration/query_processing/test_multi_thread.rs +++ b/tests/integration/query_processing/test_multi_thread.rs @@ -43,6 +43,41 @@ fn test_schema_reprepare() { assert_eq!(row, (20, 30)); } +#[test] +fn test_schema_reprepare_write() { + let tmp_db = TempDatabase::new_empty(false); + let conn1 = tmp_db.connect_limbo(); + conn1.execute("CREATE TABLE t(x, y, z)").unwrap(); + let conn2 = tmp_db.connect_limbo(); + let mut stmt = conn2.prepare("INSERT INTO t(y, z) VALUES (1, 2)").unwrap(); + let mut stmt2 = conn2.prepare("INSERT INTO t(y, z) VALUES (3, 4)").unwrap(); + conn1.execute("ALTER TABLE t DROP COLUMN x").unwrap(); + + loop { + match stmt.step().unwrap() { + turso_core::StepResult::Done => { + break; + } + turso_core::StepResult::IO => { + stmt.run_once().unwrap(); + } + step => panic!("unexpected step result {step:?}"), + } + } + + loop { + match stmt2.step().unwrap() { + turso_core::StepResult::Done => { + break; + } + turso_core::StepResult::IO => { + stmt2.run_once().unwrap(); + } + step => panic!("unexpected step result {step:?}"), + } + } +} + #[test] #[ignore] fn test_create_multiple_connections() -> anyhow::Result<()> {