From 9129991b626140272d9f51cfa5234d5ed69e78a6 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Tue, 8 Jul 2025 00:38:53 +0400 Subject: [PATCH 01/15] add id,before,after,full modes --- core/lib.rs | 34 +++++++++++++++++++++++++++++----- core/translate/emitter.rs | 38 +++++++++++++++++++++++++++++++++++--- core/translate/insert.rs | 32 ++++++++++++++++++++------------ core/translate/pragma.rs | 16 ++++++++++++++++ 4 files changed, 100 insertions(+), 20 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 7f9b8a04f..130503ede 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -443,7 +443,10 @@ fn get_schema_version(conn: &Arc) -> Result { #[derive(Debug, Clone, Eq, PartialEq)] pub enum CaptureDataChangesMode { Off, - RowidOnly { table: String }, + Id { table: String }, + Before { table: String }, + After { table: String }, + Full { table: String }, } impl CaptureDataChangesMode { @@ -453,22 +456,43 @@ impl CaptureDataChangesMode { .unwrap_or((value, TURSO_CDC_DEFAULT_TABLE_NAME)); match mode { "off" => Ok(CaptureDataChangesMode::Off), - "rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }), + "id" => Ok(CaptureDataChangesMode::Id { table: table.to_string() }), + "before" => Ok(CaptureDataChangesMode::Before { table: table.to_string() }), + "after" => Ok(CaptureDataChangesMode::After { table: table.to_string() }), + "full" => Ok(CaptureDataChangesMode::Full { table: table.to_string() }), _ => Err(LimboError::InvalidArgument( - "unexpected pragma value: expected '' or ',' parameter where mode is one of off|rowid-only".to_string(), + "unexpected pragma value: expected '' or ',' parameter where mode is one of off|id|before|after|full".to_string(), )) } } + pub fn has_after(&self) -> bool { + matches!( + self, + CaptureDataChangesMode::After { .. } | CaptureDataChangesMode::Full { .. } + ) + } + pub fn has_before(&self) -> bool { + matches!( + self, + CaptureDataChangesMode::Before { .. } | CaptureDataChangesMode::Full { .. } + ) + } pub fn mode_name(&self) -> &str { match self { CaptureDataChangesMode::Off => "off", - CaptureDataChangesMode::RowidOnly { .. } => "rowid-only", + CaptureDataChangesMode::Id { .. } => "id", + CaptureDataChangesMode::Before { .. } => "before", + CaptureDataChangesMode::After { .. } => "after", + CaptureDataChangesMode::Full { .. } => "full", } } pub fn table(&self) -> Option<&str> { match self { CaptureDataChangesMode::Off => None, - CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()), + CaptureDataChangesMode::Id { table } + | CaptureDataChangesMode::Before { table } + | CaptureDataChangesMode::After { table } + | CaptureDataChangesMode::Full { table } => Some(table.as_str()), } } } diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 0b50c40a6..db2d29365 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -582,6 +582,8 @@ fn emit_delete_insns( OperationMode::DELETE, turso_cdc_cursor_id, rowid_reg, + None, + None, table_reference.table.get_name(), )?; } @@ -1110,6 +1112,8 @@ fn emit_update_insns( OperationMode::DELETE, cdc_cursor_id, rowid_reg, + None, + None, table_ref.table.get_name(), )?; program.emit_insn(Insn::Copy { @@ -1125,6 +1129,8 @@ fn emit_update_insns( OperationMode::INSERT, cdc_cursor_id, rowid_reg, + None, + None, table_ref.table.get_name(), )?; } else { @@ -1139,6 +1145,8 @@ fn emit_update_insns( OperationMode::UPDATE, cdc_cursor_id, rowid_reg, + None, + None, table_ref.table.get_name(), )?; } @@ -1189,10 +1197,12 @@ pub fn emit_cdc_insns( operation_mode: OperationMode, cdc_cursor_id: usize, rowid_reg: usize, + before_record_reg: Option, + after_record_reg: Option, table_name: &str, ) -> Result<()> { - // (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id) - let turso_cdc_registers = program.alloc_registers(5); + // (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id, before BLOB, after BLOB) + let turso_cdc_registers = program.alloc_registers(7); program.emit_insn(Insn::Null { dest: turso_cdc_registers, dest_end: None, @@ -1231,6 +1241,28 @@ pub fn emit_cdc_insns( amount: 0, }); + if let Some(before_record_reg) = before_record_reg { + program.emit_insn(Insn::Copy { + src_reg: before_record_reg, + dst_reg: turso_cdc_registers + 5, + amount: 0, + }); + } else { + program.emit_null(turso_cdc_registers + 5, None); + program.mark_last_insn_constant(); + } + + if let Some(after_record_reg) = after_record_reg { + program.emit_insn(Insn::Copy { + src_reg: after_record_reg, + dst_reg: turso_cdc_registers + 6, + amount: 0, + }); + } else { + program.emit_null(turso_cdc_registers + 6, None); + program.mark_last_insn_constant(); + } + let rowid_reg = program.alloc_register(); program.emit_insn(Insn::NewRowid { cursor: cdc_cursor_id, @@ -1241,7 +1273,7 @@ pub fn emit_cdc_insns( let record_reg = program.alloc_register(); program.emit_insn(Insn::MakeRecord { start_reg: turso_cdc_registers, - count: 5, + count: 7, dest_reg: record_reg, index_name: None, }); diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 71127d4cb..d00f8d846 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -118,6 +118,7 @@ pub fn translate_insert( let loop_start_label = program.allocate_label(); let cdc_table = program.capture_data_changes_mode().table(); + let cdc_has_after = program.capture_data_changes_mode().has_after(); let cdc_table = if let Some(cdc_table) = cdc_table { if table.get_name() != cdc_table { let Some(turso_cdc_table) = schema.get_table(cdc_table) else { @@ -444,18 +445,6 @@ pub fn translate_insert( _ => (), } - // Write record to the turso_cdc table if necessary - if let Some((cdc_cursor_id, _)) = &cdc_table { - emit_cdc_insns( - &mut program, - &resolver, - OperationMode::INSERT, - *cdc_cursor_id, - rowid_reg, - &table_name.0, - )?; - } - let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?; for index_col_mapping in index_col_mappings { // find which cursor we opened earlier for this index @@ -575,6 +564,25 @@ pub fn translate_insert( index_name: None, }); + // Write record to the turso_cdc table if necessary + if let Some((cdc_cursor_id, _)) = &cdc_table { + let after_record_reg = if cdc_has_after { + Some(record_register) + } else { + None + }; + emit_cdc_insns( + &mut program, + &resolver, + OperationMode::INSERT, + *cdc_cursor_id, + rowid_reg, + None, + after_record_reg, + &table_name.0, + )?; + } + program.emit_insn(Insn::Insert { cursor: cursor_id, key_reg: rowid_reg, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 0f8fb5b88..45f554fc9 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -510,5 +510,21 @@ fn turso_cdc_table_columns() -> Vec { col_type: None, constraints: vec![], }, + ast::ColumnDefinition { + col_name: ast::Name("before".to_string()), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("after".to_string()), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, ] } From b258c10c9a8aa82cbc7680c9f5d309ba9fa50fe0 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 10 Jul 2025 15:24:48 +0400 Subject: [PATCH 02/15] generate before/after row values in modification statements --- core/translate/emitter.rs | 52 +++++++++++++++++++++++++++++++++------ core/translate/insert.rs | 2 +- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index db2d29365..dbfd2b3fd 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -149,7 +149,7 @@ pub struct TranslateCtx<'a> { /// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause. /// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`. pub non_aggregate_expressions: Vec<(&'a Expr, bool)>, - /// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data) + /// Cursor id for cdc table (if capture_data_changes PRAGMA is set and query can modify the data) pub cdc_cursor_id: Option, } @@ -570,19 +570,36 @@ fn emit_delete_insns( } } - if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id { + if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { let rowid_reg = program.alloc_register(); program.emit_insn(Insn::RowId { cursor_id: main_table_cursor_id, dest: rowid_reg, }); + let cdc_has_before = program.capture_data_changes_mode().has_before(); + let before_record_reg = if cdc_has_before { + let columns = table_reference.columns(); + let columns_reg = program.alloc_registers(columns.len() + 1); + for i in 0..columns.len() { + program.emit_column(main_table_cursor_id, i, columns_reg + 1 + i); + } + program.emit_insn(Insn::MakeRecord { + start_reg: columns_reg + 1, + count: columns.len(), + dest_reg: columns_reg, + index_name: None, + }); + Some(columns_reg) + } else { + None + }; emit_cdc_insns( program, &t_ctx.resolver, OperationMode::DELETE, - turso_cdc_cursor_id, + cdc_cursor_id, rowid_reg, - None, + before_record_reg, None, table_reference.table.get_name(), )?; @@ -1101,6 +1118,25 @@ fn emit_update_insns( if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { let rowid_reg = program.alloc_register(); + let has_after = program.capture_data_changes_mode().has_after(); + let has_before = program.capture_data_changes_mode().has_before(); + let before_record_reg = if has_before { + let columns = table_ref.columns(); + let columns_reg = program.alloc_registers(columns.len() + 1); + for i in 0..columns.len() { + program.emit_column(cursor_id, i, columns_reg + 1 + i); + } + program.emit_insn(Insn::MakeRecord { + start_reg: columns_reg + 1, + count: columns.len(), + dest_reg: columns_reg, + index_name: None, + }); + Some(columns_reg) + } else { + None + }; + let after_record_reg = if has_after { Some(record_reg) } else { None }; if has_user_provided_rowid { program.emit_insn(Insn::RowId { cursor_id, @@ -1112,7 +1148,7 @@ fn emit_update_insns( OperationMode::DELETE, cdc_cursor_id, rowid_reg, - None, + before_record_reg, None, table_ref.table.get_name(), )?; @@ -1130,7 +1166,7 @@ fn emit_update_insns( cdc_cursor_id, rowid_reg, None, - None, + after_record_reg, table_ref.table.get_name(), )?; } else { @@ -1145,8 +1181,8 @@ fn emit_update_insns( OperationMode::UPDATE, cdc_cursor_id, rowid_reg, - None, - None, + before_record_reg, + after_record_reg, table_ref.table.get_name(), )?; } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index d00f8d846..de5c49769 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -118,7 +118,6 @@ pub fn translate_insert( let loop_start_label = program.allocate_label(); let cdc_table = program.capture_data_changes_mode().table(); - let cdc_has_after = program.capture_data_changes_mode().has_after(); let cdc_table = if let Some(cdc_table) = cdc_table { if table.get_name() != cdc_table { let Some(turso_cdc_table) = schema.get_table(cdc_table) else { @@ -566,6 +565,7 @@ pub fn translate_insert( // Write record to the turso_cdc table if necessary if let Some((cdc_cursor_id, _)) = &cdc_table { + let cdc_has_after = program.capture_data_changes_mode().has_after(); let after_record_reg = if cdc_has_after { Some(record_register) } else { From fabb00f385220c31755026c8aa7906d680b09ae7 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 10 Jul 2025 15:28:23 +0400 Subject: [PATCH 03/15] fix test --- core/translate/emitter.rs | 4 +- tests/integration/functions/test_cdc.rs | 152 +++++++++++++++++------- 2 files changed, 110 insertions(+), 46 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index dbfd2b3fd..b33676047 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -1157,7 +1157,7 @@ fn emit_update_insns( "rowid_set_clause_reg must be set because has_user_provided_rowid is true", ), dst_reg: rowid_reg, - amount: 1, + amount: 0, }); emit_cdc_insns( program, @@ -1173,7 +1173,7 @@ fn emit_update_insns( program.emit_insn(Insn::Copy { src_reg: rowid_set_clause_reg.unwrap_or(beg), dst_reg: rowid_reg, - amount: 1, + amount: 0, }); emit_cdc_insns( program, diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index e69751b68..dd80d1f6e 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -17,7 +17,7 @@ fn replace_column_with_null(rows: Vec>, column: usize) -> Vec Date: Fri, 11 Jul 2025 07:10:51 -0700 Subject: [PATCH 04/15] add basic cdc tests for new modes --- core/types.rs | 33 ++-- tests/integration/functions/test_cdc.rs | 217 +++++++++++++++++++++++- 2 files changed, 237 insertions(+), 13 deletions(-) diff --git a/core/types.rs b/core/types.rs index 0d866bac3..a3facffc6 100644 --- a/core/types.rs +++ b/core/types.rs @@ -855,19 +855,29 @@ impl ImmutableRecord { cursor.get_values(self).unwrap_or_default() } - pub fn from_registers<'a>( - registers: impl IntoIterator + Copy, + pub fn from_registers<'a, I: Iterator + Clone>( + // we need to accept both &[Register] and &[&Register] values - that's why non-trivial signature + // + // std::slice::Iter under the hood just stores pointer and length of slice and also implements a Clone which just copy those meta-values + // (without copying the data itself) + registers: impl IntoIterator, len: usize, ) -> Self { - let mut values = Vec::with_capacity(len); + Self::from_values(registers.into_iter().map(|x| x.get_owned_value()), len) + } + + pub fn from_values<'a>( + values: impl IntoIterator + Clone, + len: usize, + ) -> Self { + let mut ref_values = Vec::with_capacity(len); let mut serials = Vec::with_capacity(len); let mut size_header = 0; let mut size_values = 0; let mut serial_type_buf = [0; 9]; // write serial types - for value in registers { - let value = value.get_owned_value(); + for value in values.clone() { let serial_type = SerialType::from(value); let n = write_varint(&mut serial_type_buf[0..], serial_type.into()); serials.push((serial_type_buf, n)); @@ -916,15 +926,14 @@ impl ImmutableRecord { } // write content - for value in registers { - let value = value.get_owned_value(); + for value in values { let start_offset = writer.pos; match value { Value::Null => { - values.push(RefValue::Null); + ref_values.push(RefValue::Null); } Value::Integer(i) => { - values.push(RefValue::Integer(*i)); + ref_values.push(RefValue::Integer(*i)); let serial_type = SerialType::from(value); match serial_type.kind() { SerialTypeKind::ConstInt0 | SerialTypeKind::ConstInt1 => {} @@ -940,7 +949,7 @@ impl ImmutableRecord { } } Value::Float(f) => { - values.push(RefValue::Float(*f)); + ref_values.push(RefValue::Float(*f)); writer.extend_from_slice(&f.to_be_bytes()) } Value::Text(t) => { @@ -952,14 +961,14 @@ impl ImmutableRecord { value: RawSlice::new(ptr, len), subtype: t.subtype.clone(), }); - values.push(value); + ref_values.push(value); } Value::Blob(b) => { writer.extend_from_slice(b); let end_offset = writer.pos; let len = end_offset - start_offset; let ptr = unsafe { writer.buf.as_ptr().add(start_offset) }; - values.push(RefValue::Blob(RawSlice::new(ptr, len))); + ref_values.push(RefValue::Blob(RawSlice::new(ptr, len))); } }; } diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index dd80d1f6e..d9ed347c7 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -1,4 +1,5 @@ use rusqlite::types::Value; +use turso_core::types::ImmutableRecord; use crate::common::{limbo_exec_rows, TempDatabase}; @@ -14,7 +15,7 @@ fn replace_column_with_null(rows: Vec>, column: usize) -> Vec(values: [Value; N]) -> Vec { + let values = values + .into_iter() + .map(|x| match x { + Value::Null => turso_core::Value::Null, + Value::Integer(x) => turso_core::Value::Integer(x), + Value::Real(x) => turso_core::Value::Float(x), + Value::Text(x) => turso_core::Value::Text(turso_core::types::Text::new(&x)), + Value::Blob(x) => turso_core::Value::Blob(x), + }) + .collect::>(); + ImmutableRecord::from_values(&values, values.len()) + .get_payload() + .to_vec() +} + +#[test] +fn test_cdc_simple_before() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('before')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap(); + conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap(); + conn.execute("DELETE FROM t WHERE x = 3").unwrap(); + conn.execute("DELETE FROM t WHERE x = 1").unwrap(); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Null, + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Null, + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Null, Value::Integer(2)])), + Value::Null, + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Blob(record([Value::Null, Value::Integer(4)])), + Value::Null, + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Null, Value::Integer(3)])), + Value::Null, + ] + ] + ); +} + +#[test] +fn test_cdc_simple_after() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('after')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap(); + conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap(); + conn.execute("DELETE FROM t WHERE x = 3").unwrap(); + conn.execute("DELETE FROM t WHERE x = 1").unwrap(); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Blob(record([Value::Null, Value::Integer(2)])), + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Blob(record([Value::Null, Value::Integer(4)])), + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Blob(record([Value::Null, Value::Integer(3)])), + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Null, + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Null, + ] + ] + ); +} + +#[test] +fn test_cdc_simple_full() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('full')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap(); + conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap(); + conn.execute("DELETE FROM t WHERE x = 3").unwrap(); + conn.execute("DELETE FROM t WHERE x = 1").unwrap(); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Blob(record([Value::Null, Value::Integer(2)])), + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Blob(record([Value::Null, Value::Integer(4)])), + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Null, Value::Integer(2)])), + Value::Blob(record([Value::Null, Value::Integer(3)])), + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Blob(record([Value::Null, Value::Integer(4)])), + Value::Null, + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Null, Value::Integer(3)])), + Value::Null, + ] + ] + ); +} + #[test] fn test_cdc_crud() { let db = TempDatabase::new_empty(false); From 54098126108e95264d98e950ad553e137d059272 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 11 Jul 2025 13:20:14 -0700 Subject: [PATCH 05/15] properly implement generation of before/after records for new modes --- core/translate/emitter.rs | 218 +++++++++++++++--------- core/translate/insert.rs | 27 +-- tests/integration/functions/test_cdc.rs | 24 +-- 3 files changed, 168 insertions(+), 101 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index b33676047..5116be14e 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -22,7 +22,7 @@ use super::select::emit_simple_count; use super::subquery::emit_subqueries; use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY; use crate::function::Func; -use crate::schema::Schema; +use crate::schema::{Schema, Table}; use crate::translate::compound_select::emit_program_for_compound_select; use crate::translate::plan::{DeletePlan, Plan, QueryDestination, Search}; use crate::translate::values::emit_values; @@ -570,6 +570,7 @@ fn emit_delete_insns( } } + // Emit update in the CDC table if necessary (before DELETE updated the table) if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { let rowid_reg = program.alloc_register(); program.emit_insn(Insn::RowId { @@ -578,18 +579,12 @@ fn emit_delete_insns( }); let cdc_has_before = program.capture_data_changes_mode().has_before(); let before_record_reg = if cdc_has_before { - let columns = table_reference.columns(); - let columns_reg = program.alloc_registers(columns.len() + 1); - for i in 0..columns.len() { - program.emit_column(main_table_cursor_id, i, columns_reg + 1 + i); - } - program.emit_insn(Insn::MakeRecord { - start_reg: columns_reg + 1, - count: columns.len(), - dest_reg: columns_reg, - index_name: None, - }); - Some(columns_reg) + Some(emit_cdc_full_record( + program, + &table_reference.table, + main_table_cursor_id, + rowid_reg, + )) } else { None }; @@ -1116,77 +1111,36 @@ fn emit_update_insns( }); } - if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { - let rowid_reg = program.alloc_register(); - let has_after = program.capture_data_changes_mode().has_after(); - let has_before = program.capture_data_changes_mode().has_before(); - let before_record_reg = if has_before { - let columns = table_ref.columns(); - let columns_reg = program.alloc_registers(columns.len() + 1); - for i in 0..columns.len() { - program.emit_column(cursor_id, i, columns_reg + 1 + i); - } - program.emit_insn(Insn::MakeRecord { - start_reg: columns_reg + 1, - count: columns.len(), - dest_reg: columns_reg, - index_name: None, - }); - Some(columns_reg) - } else { - None - }; - let after_record_reg = if has_after { Some(record_reg) } else { None }; + // create alias for CDC rowid after the change (will differ from cdc_rowid_before_reg only in case of UPDATE with change in rowid alias) + let cdc_rowid_after_reg = rowid_set_clause_reg.unwrap_or(beg); + + // create separate register with rowid before UPDATE for CDC + let cdc_rowid_before_reg = if t_ctx.cdc_cursor_id.is_some() { + let cdc_rowid_before_reg = program.alloc_register(); if has_user_provided_rowid { program.emit_insn(Insn::RowId { cursor_id, - dest: rowid_reg, + dest: cdc_rowid_before_reg, }); - emit_cdc_insns( - program, - &t_ctx.resolver, - OperationMode::DELETE, - cdc_cursor_id, - rowid_reg, - before_record_reg, - None, - table_ref.table.get_name(), - )?; - program.emit_insn(Insn::Copy { - src_reg: rowid_set_clause_reg.expect( - "rowid_set_clause_reg must be set because has_user_provided_rowid is true", - ), - dst_reg: rowid_reg, - amount: 0, - }); - emit_cdc_insns( - program, - &t_ctx.resolver, - OperationMode::INSERT, - cdc_cursor_id, - rowid_reg, - None, - after_record_reg, - table_ref.table.get_name(), - )?; + Some(cdc_rowid_before_reg) } else { - program.emit_insn(Insn::Copy { - src_reg: rowid_set_clause_reg.unwrap_or(beg), - dst_reg: rowid_reg, - amount: 0, - }); - emit_cdc_insns( - program, - &t_ctx.resolver, - OperationMode::UPDATE, - cdc_cursor_id, - rowid_reg, - before_record_reg, - after_record_reg, - table_ref.table.get_name(), - )?; + Some(cdc_rowid_after_reg) } - } + } else { + None + }; + + // create full CDC record before update if necessary + let cdc_before_reg = if program.capture_data_changes_mode().has_before() { + Some(emit_cdc_full_record( + program, + &table_ref.table, + cursor_id, + cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set"), + )) + } else { + None + }; // If we are updating the rowid, we cannot rely on overwrite on the // Insert instruction to update the cell. We need to first delete the current cell @@ -1202,6 +1156,58 @@ fn emit_update_insns( flag: InsertFlags::new().update(true), table_name: table_ref.identifier.clone(), }); + + // create full CDC record after update if necessary + let cdc_after_reg = if program.capture_data_changes_mode().has_after() { + Some(emit_cdc_patch_record( + program, + &table_ref.table, + start, + record_reg, + cdc_rowid_after_reg, + )) + } else { + None + }; + + // emit actual CDC instructions for write to the CDC table + if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { + let cdc_rowid_before_reg = + cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set"); + if has_user_provided_rowid { + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::DELETE, + cdc_cursor_id, + cdc_rowid_before_reg, + cdc_before_reg, + None, + table_ref.table.get_name(), + )?; + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::INSERT, + cdc_cursor_id, + cdc_rowid_after_reg, + cdc_after_reg, + None, + table_ref.table.get_name(), + )?; + } else { + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::UPDATE, + cdc_cursor_id, + cdc_rowid_before_reg, + cdc_before_reg, + cdc_after_reg, + table_ref.table.get_name(), + )?; + } + } } else if table_ref.virtual_table().is_some() { let arg_count = table_ref.columns().len() + 2; program.emit_insn(Insn::VUpdate { @@ -1227,6 +1233,62 @@ fn emit_update_insns( Ok(()) } +pub fn emit_cdc_patch_record( + program: &mut ProgramBuilder, + table: &Table, + columns_reg: usize, + record_reg: usize, + rowid_reg: usize, +) -> usize { + let columns = table.columns(); + let rowid_alias_position = columns.iter().position(|x| x.is_rowid_alias); + if let Some(rowid_alias_position) = rowid_alias_position { + let record_reg = program.alloc_register(); + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: columns_reg + rowid_alias_position, + amount: 0, + }); + program.emit_insn(Insn::MakeRecord { + start_reg: columns_reg, + count: table.columns().len(), + dest_reg: record_reg, + index_name: None, + }); + record_reg + } else { + record_reg + } +} + +pub fn emit_cdc_full_record( + program: &mut ProgramBuilder, + table: &Table, + table_cursor_id: usize, + rowid_reg: usize, +) -> usize { + let columns = table.columns(); + let columns_reg = program.alloc_registers(columns.len() + 1); + for (i, column) in columns.iter().enumerate() { + if column.is_rowid_alias { + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: columns_reg + 1 + i, + amount: 0, + }); + } else { + program.emit_column(table_cursor_id, i, columns_reg + 1 + i); + } + } + program.emit_insn(Insn::MakeRecord { + start_reg: columns_reg + 1, + count: columns.len(), + dest_reg: columns_reg, + index_name: None, + }); + columns_reg +} + pub fn emit_cdc_insns( program: &mut ProgramBuilder, resolver: &Resolver, diff --git a/core/translate/insert.rs b/core/translate/insert.rs index de5c49769..070e7d3c8 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -6,7 +6,7 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; -use crate::translate::emitter::{emit_cdc_insns, OperationMode}; +use crate::translate::emitter::{emit_cdc_insns, emit_cdc_patch_record, OperationMode}; use crate::util::normalize_ident; use crate::vdbe::builder::ProgramBuilderOpts; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; @@ -562,12 +562,25 @@ pub fn translate_insert( dest_reg: record_register, index_name: None, }); + program.emit_insn(Insn::Insert { + cursor: cursor_id, + key_reg: rowid_reg, + record_reg: record_register, + flag: InsertFlags::new(), + table_name: table_name.to_string(), + }); - // Write record to the turso_cdc table if necessary + // Emit update in the CDC table if necessary (after the INSERT updated the table) if let Some((cdc_cursor_id, _)) = &cdc_table { let cdc_has_after = program.capture_data_changes_mode().has_after(); let after_record_reg = if cdc_has_after { - Some(record_register) + Some(emit_cdc_patch_record( + &mut program, + &table, + column_registers_start, + record_register, + rowid_reg, + )) } else { None }; @@ -583,14 +596,6 @@ pub fn translate_insert( )?; } - program.emit_insn(Insn::Insert { - cursor: cursor_id, - key_reg: rowid_reg, - record_reg: record_register, - flag: InsertFlags::new(), - table_name: table_name.to_string(), - }); - if inserting_multiple_rows { if let Some(temp_table_ctx) = temp_table_ctx { program.emit_insn(Insn::Next { diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index d9ed347c7..28b5d0483 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -115,7 +115,7 @@ fn test_cdc_simple_before() { Value::Integer(0), Value::Text("t".to_string()), Value::Integer(1), - Value::Blob(record([Value::Null, Value::Integer(2)])), + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), Value::Null, ], vec![ @@ -124,7 +124,7 @@ fn test_cdc_simple_before() { Value::Integer(-1), Value::Text("t".to_string()), Value::Integer(3), - Value::Blob(record([Value::Null, Value::Integer(4)])), + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), Value::Null, ], vec![ @@ -133,7 +133,7 @@ fn test_cdc_simple_before() { Value::Integer(-1), Value::Text("t".to_string()), Value::Integer(1), - Value::Blob(record([Value::Null, Value::Integer(3)])), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), Value::Null, ] ] @@ -164,7 +164,7 @@ fn test_cdc_simple_after() { Value::Text("t".to_string()), Value::Integer(1), Value::Null, - Value::Blob(record([Value::Null, Value::Integer(2)])), + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), ], vec![ Value::Integer(2), @@ -173,7 +173,7 @@ fn test_cdc_simple_after() { Value::Text("t".to_string()), Value::Integer(3), Value::Null, - Value::Blob(record([Value::Null, Value::Integer(4)])), + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), ], vec![ Value::Integer(3), @@ -182,7 +182,7 @@ fn test_cdc_simple_after() { Value::Text("t".to_string()), Value::Integer(1), Value::Null, - Value::Blob(record([Value::Null, Value::Integer(3)])), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), ], vec![ Value::Integer(4), @@ -230,7 +230,7 @@ fn test_cdc_simple_full() { Value::Text("t".to_string()), Value::Integer(1), Value::Null, - Value::Blob(record([Value::Null, Value::Integer(2)])), + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), ], vec![ Value::Integer(2), @@ -239,7 +239,7 @@ fn test_cdc_simple_full() { Value::Text("t".to_string()), Value::Integer(3), Value::Null, - Value::Blob(record([Value::Null, Value::Integer(4)])), + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), ], vec![ Value::Integer(3), @@ -247,8 +247,8 @@ fn test_cdc_simple_full() { Value::Integer(0), Value::Text("t".to_string()), Value::Integer(1), - Value::Blob(record([Value::Null, Value::Integer(2)])), - Value::Blob(record([Value::Null, Value::Integer(3)])), + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), ], vec![ Value::Integer(4), @@ -256,7 +256,7 @@ fn test_cdc_simple_full() { Value::Integer(-1), Value::Text("t".to_string()), Value::Integer(3), - Value::Blob(record([Value::Null, Value::Integer(4)])), + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), Value::Null, ], vec![ @@ -265,7 +265,7 @@ fn test_cdc_simple_full() { Value::Integer(-1), Value::Text("t".to_string()), Value::Integer(1), - Value::Blob(record([Value::Null, Value::Integer(3)])), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), Value::Null, ] ] From eed89993f962705ba8543eda7fc3840643b5c8b4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 11 Jul 2025 13:23:37 -0700 Subject: [PATCH 06/15] fix clippy --- core/translate/emitter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 5116be14e..4169ba617 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -1289,6 +1289,7 @@ pub fn emit_cdc_full_record( columns_reg } +#[allow(clippy::too_many_arguments)] pub fn emit_cdc_insns( program: &mut ProgramBuilder, resolver: &Resolver, From 81cd04dd65abf0cc3608f53817b9c1d787357525 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 10 Jul 2025 19:25:15 +0400 Subject: [PATCH 07/15] add bin_record_json_object and table_columns_json_array functions --- core/function.rs | 8 ++ core/json/mod.rs | 39 ++++++--- core/storage/sqlite3_ondisk.rs | 14 +--- core/translate/expr.rs | 57 +++++++++++-- core/types.rs | 15 ++++ core/vdbe/execute.rs | 104 +++++++++++++++++++++++- tests/integration/functions/test_cdc.rs | 44 ++++++++++ 7 files changed, 249 insertions(+), 32 deletions(-) diff --git a/core/function.rs b/core/function.rs index 76d281262..777e1e1f3 100644 --- a/core/function.rs +++ b/core/function.rs @@ -318,6 +318,8 @@ pub enum ScalarFunc { Likely, TimeDiff, Likelihood, + TableColumnsJsonArray, + BinRecordJsonObject, } impl ScalarFunc { @@ -375,6 +377,8 @@ impl ScalarFunc { ScalarFunc::Likely => true, ScalarFunc::TimeDiff => false, ScalarFunc::Likelihood => true, + ScalarFunc::TableColumnsJsonArray => true, // while columns of the table can change with DDL statements, within single query plan it's static + ScalarFunc::BinRecordJsonObject => true, } } } @@ -434,6 +438,8 @@ impl Display for ScalarFunc { Self::Likely => "likely".to_string(), Self::TimeDiff => "timediff".to_string(), Self::Likelihood => "likelihood".to_string(), + Self::TableColumnsJsonArray => "table_columns_json_array".to_string(), + Self::BinRecordJsonObject => "bin_record_json_object".to_string(), }; write!(f, "{str}") } @@ -777,6 +783,8 @@ impl Func { "unhex" => Ok(Self::Scalar(ScalarFunc::Unhex)), "zeroblob" => Ok(Self::Scalar(ScalarFunc::ZeroBlob)), "soundex" => Ok(Self::Scalar(ScalarFunc::Soundex)), + "table_columns_json_array" => Ok(Self::Scalar(ScalarFunc::TableColumnsJsonArray)), + "bin_record_json_object" => Ok(Self::Scalar(ScalarFunc::BinRecordJsonObject)), "acos" => Ok(Self::Math(MathFunc::Acos)), "acosh" => Ok(Self::Math(MathFunc::Acosh)), "asin" => Ok(Self::Math(MathFunc::Asin)), diff --git a/core/json/mod.rs b/core/json/mod.rs index 5a4930f3c..438244efd 100644 --- a/core/json/mod.rs +++ b/core/json/mod.rs @@ -1,8 +1,8 @@ mod cache; mod error; -mod jsonb; +pub(crate) mod jsonb; mod ops; -mod path; +pub(crate) mod path; use crate::json::error::Error as JsonError; pub use crate::json::ops::{ @@ -10,9 +10,9 @@ pub use crate::json::ops::{ jsonb_replace, }; use crate::json::path::{json_path, JsonPath, PathElement}; -use crate::types::{Text, TextSubtype, Value, ValueType}; +use crate::types::{RawSlice, Text, TextRef, TextSubtype, Value, ValueType}; use crate::vdbe::Register; -use crate::{bail_constraint_error, bail_parse_error, LimboError}; +use crate::{bail_constraint_error, bail_parse_error, LimboError, RefValue}; pub use cache::JsonCacheCell; use jsonb::{ElementType, Jsonb, JsonbHeader, PathOperationMode, SearchOperation, SetOperation}; use std::borrow::Cow; @@ -26,7 +26,7 @@ pub enum Conv { } #[cfg(feature = "json")] -enum OutputVariant { +pub enum OutputVariant { ElementType, Binary, String, @@ -100,9 +100,22 @@ pub fn json_from_raw_bytes_agg(data: &[u8], raw: bool) -> crate::Result { } } -fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result { +pub fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result { + convert_ref_dbtype_to_jsonb( + &match val { + Value::Null => RefValue::Null, + Value::Integer(x) => RefValue::Integer(*x), + Value::Float(x) => RefValue::Float(*x), + Value::Text(text) => RefValue::Text(TextRef::create_from(text.as_str().as_bytes())), + Value::Blob(items) => RefValue::Blob(RawSlice::create_from(&items)), + }, + strict, + ) +} + +pub fn convert_ref_dbtype_to_jsonb(val: &RefValue, strict: Conv) -> crate::Result { match val { - Value::Text(text) => { + RefValue::Text(text) => { let res = if text.subtype == TextSubtype::Json || matches!(strict, Conv::Strict) { // Parse directly as JSON if it's already JSON subtype or strict mode is on let json = if matches!(strict, Conv::ToString) { @@ -125,20 +138,20 @@ fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result { }; res.map_err(|_| LimboError::ParseError("malformed JSON".to_string())) } - Value::Blob(blob) => { - let json = Jsonb::from_raw_data(blob); + RefValue::Blob(blob) => { + let json = Jsonb::from_raw_data(blob.to_slice()); json.is_valid()?; Ok(json) } - Value::Null => Ok(Jsonb::from_raw_data( + RefValue::Null => Ok(Jsonb::from_raw_data( JsonbHeader::make_null().into_bytes().as_bytes(), )), - Value::Float(float) => { + RefValue::Float(float) => { let mut buff = ryu::Buffer::new(); Jsonb::from_str(buff.format(*float)) .map_err(|_| LimboError::ParseError("malformed JSON".to_string())) } - Value::Integer(int) => Jsonb::from_str(&int.to_string()) + RefValue::Integer(int) => Jsonb::from_str(&int.to_string()) .map_err(|_| LimboError::ParseError("malformed JSON".to_string())), } } @@ -405,7 +418,7 @@ fn jsonb_extract_internal(value: Jsonb, paths: &[Register]) -> crate::Result<(Js Ok((result, ElementType::ARRAY)) } -fn json_string_to_db_type( +pub fn json_string_to_db_type( json: Jsonb, element_type: ElementType, flag: OutputVariant, diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index b2b545d36..2aa13d917 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -60,7 +60,7 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; -use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; +use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef}; use crate::{File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; @@ -1160,17 +1160,9 @@ pub fn read_value(buf: &[u8], serial_type: SerialType) -> Result<(RefValue, usiz content_size ); } - let slice = if content_size == 0 { - RawSlice::new(std::ptr::null(), 0) - } else { - let ptr = &buf[0] as *const u8; - RawSlice::new(ptr, content_size) - }; + Ok(( - RefValue::Text(TextRef { - value: slice, - subtype: TextSubtype::Text, - }), + RefValue::Text(TextRef::create_from(&buf[..content_size])), content_size, )) } diff --git a/core/translate/expr.rs b/core/translate/expr.rs index ea36918b6..99fc22404 100644 --- a/core/translate/expr.rs +++ b/core/translate/expr.rs @@ -1722,6 +1722,22 @@ pub fn translate_expr( ); } + let start_reg = program.alloc_register(); + program.emit_insn(Insn::Copy { + src_reg: start_reg, + dst_reg: target_register, + amount: 0, + }); + + Ok(target_register) + } + ScalarFunc::TableColumnsJsonArray => { + if args.is_none() || args.as_ref().unwrap().len() != 1 { + crate::bail_parse_error!( + "table_columns_json_array() function must have exactly 1 argument", + ); + } + let args = args.as_ref().unwrap(); let start_reg = program.alloc_register(); translate_expr( program, @@ -1730,13 +1746,42 @@ pub fn translate_expr( start_reg, resolver, )?; - - program.emit_insn(Insn::Copy { - src_reg: start_reg, - dst_reg: target_register, - amount: 0, + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg, + dest: target_register, + func: func_ctx, + }); + Ok(target_register) + } + ScalarFunc::BinRecordJsonObject => { + if args.is_none() || args.as_ref().unwrap().len() != 2 { + crate::bail_parse_error!( + "bin_record_json_object() function must have exactly 2 arguments", + ); + } + let args = args.as_ref().unwrap(); + let start_reg = program.alloc_registers(2); + translate_expr( + program, + referenced_tables, + &args[0], + start_reg, + resolver, + )?; + translate_expr( + program, + referenced_tables, + &args[1], + start_reg + 1, + resolver, + )?; + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg, + dest: target_register, + func: func_ctx, }); - Ok(target_register) } } diff --git a/core/types.rs b/core/types.rs index a3facffc6..95046a7e7 100644 --- a/core/types.rs +++ b/core/types.rs @@ -127,6 +127,13 @@ impl Display for TextRef { } impl TextRef { + pub fn create_from(value: &[u8]) -> Self { + let value = RawSlice::create_from(value); + Self { + value, + subtype: TextSubtype::Text, + } + } pub fn as_str(&self) -> &str { unsafe { std::str::from_utf8_unchecked(self.value.to_slice()) } } @@ -2346,6 +2353,14 @@ pub enum SeekKey<'a> { } impl RawSlice { + pub fn create_from(value: &[u8]) -> Self { + if value.len() == 0 { + RawSlice::new(std::ptr::null(), 0) + } else { + let ptr = &value[0] as *const u8; + RawSlice::new(ptr, value.len()) + } + } pub fn new(data: *const u8, len: usize) -> Self { Self { data, len } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f7ab218a7..b701b4daa 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -28,6 +28,7 @@ use crate::{ }, }; use std::ops::DerefMut; +use std::str::FromStr; use std::sync::atomic::AtomicUsize; use std::sync::Mutex; use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; @@ -79,8 +80,8 @@ use std::{cell::RefCell, collections::HashMap}; #[cfg(feature = "json")] use crate::{ - function::JsonFunc, json::convert_dbtype_to_raw_jsonb, json::get_json, json::is_json_valid, - json::json_array, json::json_array_length, json::json_arrow_extract, + function::JsonFunc, json, json::convert_dbtype_to_raw_jsonb, json::get_json, + json::is_json_valid, json::json_array, json::json_array_length, json::json_arrow_extract, json::json_arrow_shift_extract, json::json_error_position, json::json_extract, json::json_from_raw_bytes_agg, json::json_insert, json::json_object, json::json_patch, json::json_quote, json::json_remove, json::json_replace, json::json_set, json::json_type, @@ -4040,6 +4041,105 @@ pub fn op_function( .exec_likelihood(probability.get_owned_value()); state.registers[*dest] = Register::Value(result); } + ScalarFunc::TableColumnsJsonArray => { + assert_eq!(arg_count, 1); + let table = state.registers[*start_reg].get_owned_value(); + let Value::Text(table) = table else { + return Err(LimboError::InvalidArgument( + "table_columns_json_array: function argument must be of type TEXT" + .to_string(), + )); + }; + let table = { + let schema = program.connection.schema.borrow(); + match schema.get_table(table.as_str()) { + Some(table) => table, + None => { + return Err(LimboError::InvalidArgument(format!( + "table_columns_json_array: table {} doesn't exists", + table + ))) + } + } + }; + + let mut json = json::jsonb::Jsonb::make_empty_array(table.columns().len() * 10); + for column in table.columns() { + let name = column.name.as_ref().unwrap(); + let name_json = json::convert_ref_dbtype_to_jsonb( + &RefValue::Text(TextRef::create_from(name.as_str().as_bytes())), + json::Conv::ToString, + )?; + json.append_jsonb_to_end(name_json.data()); + } + json.finalize_unsafe(json::jsonb::ElementType::ARRAY)?; + state.registers[*dest] = Register::Value(json::json_string_to_db_type( + json, + json::jsonb::ElementType::ARRAY, + json::OutputVariant::String, + )?); + } + ScalarFunc::BinRecordJsonObject => { + assert_eq!(arg_count, 2); + 'outer: { + let columns_str = state.registers[*start_reg].get_owned_value(); + let bin_record = state.registers[*start_reg + 1].get_owned_value(); + let Value::Text(columns_str) = columns_str else { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: function arguments must be of type TEXT and BLOB correspondingly".to_string() + )); + }; + + if let Value::Null = bin_record { + state.registers[*dest] = Register::Value(Value::Null); + break 'outer; + } + + let Value::Blob(bin_record) = bin_record else { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: function arguments must be of type TEXT and BLOB correspondingly".to_string() + )); + }; + let mut columns_json_array = + json::jsonb::Jsonb::from_str(columns_str.as_str())?; + let columns_len = columns_json_array.array_len()?; + + let mut record = ImmutableRecord::new(bin_record.len()); + read_record(&bin_record, &mut record)?; + + let mut json = json::jsonb::Jsonb::make_empty_obj(record.len()); + for i in 0..columns_len { + let mut op = json::jsonb::SearchOperation::new(0); + let path = json::path::JsonPath { + elements: vec![ + json::path::PathElement::Root(), + json::path::PathElement::ArrayLocator(Some(i as i32)), + ], + }; + + columns_json_array.operate_on_path(&path, &mut op)?; + let column_name = op.result(); + json.append_jsonb_to_end(column_name.data()); + + let val = record.get_value(i); + if let RefValue::Blob(..) = val { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: formatting of BLOB values stored in binary record is not supported".to_string() + )); + } + let val_json = + json::convert_ref_dbtype_to_jsonb(val, json::Conv::NotStrict)?; + json.append_jsonb_to_end(val_json.data()); + } + json.finalize_unsafe(json::jsonb::ElementType::OBJECT)?; + + state.registers[*dest] = Register::Value(json::json_string_to_db_type( + json, + json::jsonb::ElementType::OBJECT, + json::OutputVariant::String, + )?); + } + } }, crate::function::Func::Vector(vector_func) => match vector_func { VectorFunc::Vector => { diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index 28b5d0483..d88bf3c20 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -834,3 +834,47 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { ] ); } + +#[test] +fn test_cdc_table_columns() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + conn.execute("CREATE TABLE t(a INTEGER PRIMARY KEY, b, c UNIQUE)") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT table_columns_json_array('t')"); + assert_eq!( + rows, + vec![vec![Value::Text(r#"["a","b","c"]"#.to_string())]] + ); + conn.execute("ALTER TABLE t DROP COLUMN b").unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT table_columns_json_array('t')"); + assert_eq!(rows, vec![vec![Value::Text(r#"["a","c"]"#.to_string())]]); +} + +#[test] +fn test_cdc_bin_record() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + let record = record([ + Value::Null, + Value::Integer(1), + Value::Real(3.1415), + Value::Text("hello".to_string()), + ]); + let mut record_hex = String::new(); + for byte in record { + record_hex.push_str(&format!("{:02X}", byte)); + } + + let rows = limbo_exec_rows( + &db, + &conn, + &format!(r#"SELECT bin_record_json_object('["a","b","c","d"]', X'{record_hex}')"#), + ); + assert_eq!( + rows, + vec![vec![Value::Text( + r#"{"a":null,"b":1,"c":3.1415,"d":"hello"}"#.to_string() + )]] + ); +} From bf25a0e3f14c7b49f991af99af26c848916b0858 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 11 Jul 2025 14:57:05 -0700 Subject: [PATCH 08/15] fix clippy --- core/json/mod.rs | 2 +- core/types.rs | 2 +- core/vdbe/execute.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/json/mod.rs b/core/json/mod.rs index 438244efd..8c1d1dc1e 100644 --- a/core/json/mod.rs +++ b/core/json/mod.rs @@ -107,7 +107,7 @@ pub fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result RefValue::Integer(*x), Value::Float(x) => RefValue::Float(*x), Value::Text(text) => RefValue::Text(TextRef::create_from(text.as_str().as_bytes())), - Value::Blob(items) => RefValue::Blob(RawSlice::create_from(&items)), + Value::Blob(items) => RefValue::Blob(RawSlice::create_from(items)), }, strict, ) diff --git a/core/types.rs b/core/types.rs index 95046a7e7..5615e7c57 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2354,7 +2354,7 @@ pub enum SeekKey<'a> { impl RawSlice { pub fn create_from(value: &[u8]) -> Self { - if value.len() == 0 { + if value.is_empty() { RawSlice::new(std::ptr::null(), 0) } else { let ptr = &value[0] as *const u8; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index b701b4daa..c5c9a234b 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4105,7 +4105,7 @@ pub fn op_function( let columns_len = columns_json_array.array_len()?; let mut record = ImmutableRecord::new(bin_record.len()); - read_record(&bin_record, &mut record)?; + read_record(bin_record, &mut record)?; let mut json = json::jsonb::Jsonb::make_empty_obj(record.len()); for i in 0..columns_len { From c9e7271eafc7663cfaa464dca78026690b0c6668 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 11 Jul 2025 15:29:21 -0700 Subject: [PATCH 09/15] properly pass subtype --- core/json/mod.rs | 4 +++- core/storage/sqlite3_ondisk.rs | 9 +++++++-- core/types.rs | 9 +++------ core/vdbe/execute.rs | 5 ++++- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/core/json/mod.rs b/core/json/mod.rs index 8c1d1dc1e..99da492c0 100644 --- a/core/json/mod.rs +++ b/core/json/mod.rs @@ -106,7 +106,9 @@ pub fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result RefValue::Null, Value::Integer(x) => RefValue::Integer(*x), Value::Float(x) => RefValue::Float(*x), - Value::Text(text) => RefValue::Text(TextRef::create_from(text.as_str().as_bytes())), + Value::Text(text) => { + RefValue::Text(TextRef::create_from(text.as_str().as_bytes(), text.subtype)) + } Value::Blob(items) => RefValue::Blob(RawSlice::create_from(items)), }, strict, diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 2aa13d917..5eb05e4ba 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -60,7 +60,9 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; -use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef}; +use crate::types::{ + RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype, +}; use crate::{File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; @@ -1162,7 +1164,10 @@ pub fn read_value(buf: &[u8], serial_type: SerialType) -> Result<(RefValue, usiz } Ok(( - RefValue::Text(TextRef::create_from(&buf[..content_size])), + RefValue::Text(TextRef::create_from( + &buf[..content_size], + TextSubtype::Text, + )), content_size, )) } diff --git a/core/types.rs b/core/types.rs index 5615e7c57..68d00f494 100644 --- a/core/types.rs +++ b/core/types.rs @@ -48,7 +48,7 @@ impl Display for ValueType { } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum TextSubtype { Text, @@ -127,12 +127,9 @@ impl Display for TextRef { } impl TextRef { - pub fn create_from(value: &[u8]) -> Self { + pub fn create_from(value: &[u8], subtype: TextSubtype) -> Self { let value = RawSlice::create_from(value); - Self { - value, - subtype: TextSubtype::Text, - } + Self { value, subtype } } pub fn as_str(&self) -> &str { unsafe { std::str::from_utf8_unchecked(self.value.to_slice()) } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index c5c9a234b..736e11317 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4067,7 +4067,10 @@ pub fn op_function( for column in table.columns() { let name = column.name.as_ref().unwrap(); let name_json = json::convert_ref_dbtype_to_jsonb( - &RefValue::Text(TextRef::create_from(name.as_str().as_bytes())), + &RefValue::Text(TextRef::create_from( + name.as_str().as_bytes(), + TextSubtype::Text, + )), json::Conv::ToString, )?; json.append_jsonb_to_end(name_json.data()); From f61d733dd3bc8a243f77f417a0c5c01ca826ff0f Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 11 Jul 2025 15:35:15 -0700 Subject: [PATCH 10/15] make new functions dependend on "json" Cargo feature --- core/storage/sqlite3_ondisk.rs | 4 +- core/vdbe/execute.rs | 103 ++++++++++++++++++++------------- 2 files changed, 64 insertions(+), 43 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 5eb05e4ba..955be55d9 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -60,9 +60,7 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; -use crate::types::{ - RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype, -}; +use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 736e11317..a7ac46f94 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -28,10 +28,9 @@ use crate::{ }, }; use std::ops::DerefMut; -use std::str::FromStr; -use std::sync::atomic::AtomicUsize; use std::sync::Mutex; use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; +use std::{ sync::atomic::AtomicUsize}; use crate::{pseudo::PseudoCursor, result::LimboResult}; @@ -4043,48 +4042,71 @@ pub fn op_function( } ScalarFunc::TableColumnsJsonArray => { assert_eq!(arg_count, 1); - let table = state.registers[*start_reg].get_owned_value(); - let Value::Text(table) = table else { + #[cfg(not(feature = "json"))] + { return Err(LimboError::InvalidArgument( - "table_columns_json_array: function argument must be of type TEXT" + "table_columns_json_array: turso must be compiled with JSON support" .to_string(), )); - }; - let table = { - let schema = program.connection.schema.borrow(); - match schema.get_table(table.as_str()) { - Some(table) => table, - None => { - return Err(LimboError::InvalidArgument(format!( - "table_columns_json_array: table {} doesn't exists", - table - ))) - } - } - }; - - let mut json = json::jsonb::Jsonb::make_empty_array(table.columns().len() * 10); - for column in table.columns() { - let name = column.name.as_ref().unwrap(); - let name_json = json::convert_ref_dbtype_to_jsonb( - &RefValue::Text(TextRef::create_from( - name.as_str().as_bytes(), - TextSubtype::Text, - )), - json::Conv::ToString, - )?; - json.append_jsonb_to_end(name_json.data()); } - json.finalize_unsafe(json::jsonb::ElementType::ARRAY)?; - state.registers[*dest] = Register::Value(json::json_string_to_db_type( - json, - json::jsonb::ElementType::ARRAY, - json::OutputVariant::String, - )?); + #[cfg(feature = "json")] + { + use crate::types::{TextRef, TextSubtype}; + + let table = state.registers[*start_reg].get_owned_value(); + let Value::Text(table) = table else { + return Err(LimboError::InvalidArgument( + "table_columns_json_array: function argument must be of type TEXT" + .to_string(), + )); + }; + let table = { + let schema = program.connection.schema.borrow(); + match schema.get_table(table.as_str()) { + Some(table) => table, + None => { + return Err(LimboError::InvalidArgument(format!( + "table_columns_json_array: table {} doesn't exists", + table + ))) + } + } + }; + + let mut json = json::jsonb::Jsonb::make_empty_array(table.columns().len() * 10); + for column in table.columns() { + let name = column.name.as_ref().unwrap(); + let name_json = json::convert_ref_dbtype_to_jsonb( + &RefValue::Text(TextRef::create_from( + name.as_str().as_bytes(), + TextSubtype::Text, + )), + json::Conv::ToString, + )?; + json.append_jsonb_to_end(name_json.data()); + } + json.finalize_unsafe(json::jsonb::ElementType::ARRAY)?; + state.registers[*dest] = Register::Value(json::json_string_to_db_type( + json, + json::jsonb::ElementType::ARRAY, + json::OutputVariant::String, + )?); + } } ScalarFunc::BinRecordJsonObject => { assert_eq!(arg_count, 2); + #[cfg(not(feature = "json"))] + { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: turso must be compiled with JSON support" + .to_string(), + )); + } + #[cfg(feature = "json")] 'outer: { + use crate::types::RecordCursor; + use std::str::FromStr; + let columns_str = state.registers[*start_reg].get_owned_value(); let bin_record = state.registers[*start_reg + 1].get_owned_value(); let Value::Text(columns_str) = columns_str else { @@ -4108,9 +4130,10 @@ pub fn op_function( let columns_len = columns_json_array.array_len()?; let mut record = ImmutableRecord::new(bin_record.len()); - read_record(bin_record, &mut record)?; + record.start_serialization(&bin_record); + let mut record_cursor = RecordCursor::new(); - let mut json = json::jsonb::Jsonb::make_empty_obj(record.len()); + let mut json = json::jsonb::Jsonb::make_empty_obj(columns_len); for i in 0..columns_len { let mut op = json::jsonb::SearchOperation::new(0); let path = json::path::JsonPath { @@ -4124,14 +4147,14 @@ pub fn op_function( let column_name = op.result(); json.append_jsonb_to_end(column_name.data()); - let val = record.get_value(i); + let val = record_cursor.get_value(&record, i)?; if let RefValue::Blob(..) = val { return Err(LimboError::InvalidArgument( "bin_record_json_object: formatting of BLOB values stored in binary record is not supported".to_string() )); } let val_json = - json::convert_ref_dbtype_to_jsonb(val, json::Conv::NotStrict)?; + json::convert_ref_dbtype_to_jsonb(&val, json::Conv::NotStrict)?; json.append_jsonb_to_end(val_json.data()); } json.finalize_unsafe(json::jsonb::ElementType::OBJECT)?; From cc04f11bd670d81be1e804b95f18a88f580ed3a6 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 11 Jul 2025 15:37:46 -0700 Subject: [PATCH 11/15] remove clone --- core/types.rs | 4 ++-- core/vdbe/execute.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/types.rs b/core/types.rs index 68d00f494..43494894f 100644 --- a/core/types.rs +++ b/core/types.rs @@ -963,7 +963,7 @@ impl ImmutableRecord { let ptr = unsafe { writer.buf.as_ptr().add(start_offset) }; let value = RefValue::Text(TextRef { value: RawSlice::new(ptr, len), - subtype: t.subtype.clone(), + subtype: t.subtype, }); ref_values.push(value); } @@ -1365,7 +1365,7 @@ impl RefValue { RefValue::Float(f) => Value::Float(*f), RefValue::Text(text_ref) => Value::Text(Text { value: text_ref.value.to_slice().to_vec(), - subtype: text_ref.subtype.clone(), + subtype: text_ref.subtype, }), RefValue::Blob(b) => Value::Blob(b.to_slice().to_vec()), } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index a7ac46f94..ab1ca9995 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -28,9 +28,9 @@ use crate::{ }, }; use std::ops::DerefMut; +use std::sync::atomic::AtomicUsize; use std::sync::Mutex; use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; -use std::{ sync::atomic::AtomicUsize}; use crate::{pseudo::PseudoCursor, result::LimboResult}; From 551c353fff4505b4a0fecadb0c8c5b86d6fdc59b Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sat, 12 Jul 2025 13:53:20 +0400 Subject: [PATCH 12/15] fix clippy --- tests/integration/functions/test_cdc.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index d88bf3c20..1d683eefc 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -858,7 +858,8 @@ fn test_cdc_bin_record() { let record = record([ Value::Null, Value::Integer(1), - Value::Real(3.1415), + // use golden ratio instead of pi because clippy has weird rule that I can't use PI approximation written by hand + Value::Real(1.61803), Value::Text("hello".to_string()), ]); let mut record_hex = String::new(); @@ -874,7 +875,7 @@ fn test_cdc_bin_record() { assert_eq!( rows, vec![vec![Value::Text( - r#"{"a":null,"b":1,"c":3.1415,"d":"hello"}"#.to_string() + r#"{"a":null,"b":1,"c":1.61803,"d":"hello"}"#.to_string() )]] ); } From e94ebbad047f71816c8e5c9a8359138d8e930d57 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sat, 12 Jul 2025 13:57:02 +0400 Subject: [PATCH 13/15] remove unwanted changes --- core/translate/expr.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/translate/expr.rs b/core/translate/expr.rs index 99fc22404..bc82c1cae 100644 --- a/core/translate/expr.rs +++ b/core/translate/expr.rs @@ -1723,12 +1723,18 @@ pub fn translate_expr( } let start_reg = program.alloc_register(); + translate_expr( + program, + referenced_tables, + &args[0], + start_reg, + resolver, + )?; program.emit_insn(Insn::Copy { src_reg: start_reg, dst_reg: target_register, amount: 0, }); - Ok(target_register) } ScalarFunc::TableColumnsJsonArray => { From b330c6b70ebdf9af9e78590f98bccaa944bd0416 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 14 Jul 2025 11:38:08 +0400 Subject: [PATCH 14/15] fix clippy --- core/vdbe/execute.rs | 4 ++-- core/vdbe/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ab1ca9995..9079d23a0 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1597,7 +1597,7 @@ pub fn op_column( if existing_text.value.capacity() >= new_text.value.len() { existing_text.value.clear(); existing_text.value.extend_from_slice(&new_text.value); - existing_text.subtype = new_text.subtype.clone(); + existing_text.subtype = new_text.subtype; } else { state.registers[*dest] = Register::Value(value); } @@ -4130,7 +4130,7 @@ pub fn op_function( let columns_len = columns_json_array.array_len()?; let mut record = ImmutableRecord::new(bin_record.len()); - record.start_serialization(&bin_record); + record.start_serialization(bin_record); let mut record_cursor = RecordCursor::new(); let mut json = json::jsonb::Jsonb::make_empty_obj(columns_len); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 776cbb9f8..56a90dcef 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -598,7 +598,7 @@ pub fn registers_to_ref_values(registers: &[Register]) -> Vec { Value::Float(f) => RefValue::Float(*f), Value::Text(t) => RefValue::Text(TextRef { value: RawSlice::new(t.value.as_ptr(), t.value.len()), - subtype: t.subtype.clone(), + subtype: t.subtype, }), Value::Blob(b) => RefValue::Blob(RawSlice::new(b.as_ptr(), b.len())), } From 0457567714044743e8900174564d2451440283ff Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 14 Jul 2025 12:09:39 +0400 Subject: [PATCH 15/15] more clippy fixes --- core/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/types.rs b/core/types.rs index 43494894f..df477cfdd 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2426,7 +2426,7 @@ mod tests { Value::Float(f) => RefValue::Float(*f), Value::Text(text) => RefValue::Text(TextRef { value: RawSlice::from_slice(&text.value), - subtype: text.subtype.clone(), + subtype: text.subtype, }), Value::Blob(blob) => RefValue::Blob(RawSlice::from_slice(blob)), }