diff --git a/core/function.rs b/core/function.rs index 76d281262..777e1e1f3 100644 --- a/core/function.rs +++ b/core/function.rs @@ -318,6 +318,8 @@ pub enum ScalarFunc { Likely, TimeDiff, Likelihood, + TableColumnsJsonArray, + BinRecordJsonObject, } impl ScalarFunc { @@ -375,6 +377,8 @@ impl ScalarFunc { ScalarFunc::Likely => true, ScalarFunc::TimeDiff => false, ScalarFunc::Likelihood => true, + ScalarFunc::TableColumnsJsonArray => true, // while columns of the table can change with DDL statements, within single query plan it's static + ScalarFunc::BinRecordJsonObject => true, } } } @@ -434,6 +438,8 @@ impl Display for ScalarFunc { Self::Likely => "likely".to_string(), Self::TimeDiff => "timediff".to_string(), Self::Likelihood => "likelihood".to_string(), + Self::TableColumnsJsonArray => "table_columns_json_array".to_string(), + Self::BinRecordJsonObject => "bin_record_json_object".to_string(), }; write!(f, "{str}") } @@ -777,6 +783,8 @@ impl Func { "unhex" => Ok(Self::Scalar(ScalarFunc::Unhex)), "zeroblob" => Ok(Self::Scalar(ScalarFunc::ZeroBlob)), "soundex" => Ok(Self::Scalar(ScalarFunc::Soundex)), + "table_columns_json_array" => Ok(Self::Scalar(ScalarFunc::TableColumnsJsonArray)), + "bin_record_json_object" => Ok(Self::Scalar(ScalarFunc::BinRecordJsonObject)), "acos" => Ok(Self::Math(MathFunc::Acos)), "acosh" => Ok(Self::Math(MathFunc::Acosh)), "asin" => Ok(Self::Math(MathFunc::Asin)), diff --git a/core/json/mod.rs b/core/json/mod.rs index 5a4930f3c..99da492c0 100644 --- a/core/json/mod.rs +++ b/core/json/mod.rs @@ -1,8 +1,8 @@ mod cache; mod error; -mod jsonb; +pub(crate) mod jsonb; mod ops; -mod path; +pub(crate) mod path; use crate::json::error::Error as JsonError; pub use crate::json::ops::{ @@ -10,9 +10,9 @@ pub use crate::json::ops::{ jsonb_replace, }; use crate::json::path::{json_path, JsonPath, PathElement}; -use crate::types::{Text, TextSubtype, Value, ValueType}; +use crate::types::{RawSlice, Text, TextRef, TextSubtype, Value, ValueType}; use crate::vdbe::Register; -use crate::{bail_constraint_error, bail_parse_error, LimboError}; +use crate::{bail_constraint_error, bail_parse_error, LimboError, RefValue}; pub use cache::JsonCacheCell; use jsonb::{ElementType, Jsonb, JsonbHeader, PathOperationMode, SearchOperation, SetOperation}; use std::borrow::Cow; @@ -26,7 +26,7 @@ pub enum Conv { } #[cfg(feature = "json")] -enum OutputVariant { +pub enum OutputVariant { ElementType, Binary, String, @@ -100,9 +100,24 @@ pub fn json_from_raw_bytes_agg(data: &[u8], raw: bool) -> crate::Result { } } -fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result { +pub fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result { + convert_ref_dbtype_to_jsonb( + &match val { + Value::Null => RefValue::Null, + Value::Integer(x) => RefValue::Integer(*x), + Value::Float(x) => RefValue::Float(*x), + Value::Text(text) => { + RefValue::Text(TextRef::create_from(text.as_str().as_bytes(), text.subtype)) + } + Value::Blob(items) => RefValue::Blob(RawSlice::create_from(items)), + }, + strict, + ) +} + +pub fn convert_ref_dbtype_to_jsonb(val: &RefValue, strict: Conv) -> crate::Result { match val { - Value::Text(text) => { + RefValue::Text(text) => { let res = if text.subtype == TextSubtype::Json || matches!(strict, Conv::Strict) { // Parse directly as JSON if it's already JSON subtype or strict mode is on let json = if matches!(strict, Conv::ToString) { @@ -125,20 +140,20 @@ fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result { }; res.map_err(|_| LimboError::ParseError("malformed JSON".to_string())) } - Value::Blob(blob) => { - let json = Jsonb::from_raw_data(blob); + RefValue::Blob(blob) => { + let json = Jsonb::from_raw_data(blob.to_slice()); json.is_valid()?; Ok(json) } - Value::Null => Ok(Jsonb::from_raw_data( + RefValue::Null => Ok(Jsonb::from_raw_data( JsonbHeader::make_null().into_bytes().as_bytes(), )), - Value::Float(float) => { + RefValue::Float(float) => { let mut buff = ryu::Buffer::new(); Jsonb::from_str(buff.format(*float)) .map_err(|_| LimboError::ParseError("malformed JSON".to_string())) } - Value::Integer(int) => Jsonb::from_str(&int.to_string()) + RefValue::Integer(int) => Jsonb::from_str(&int.to_string()) .map_err(|_| LimboError::ParseError("malformed JSON".to_string())), } } @@ -405,7 +420,7 @@ fn jsonb_extract_internal(value: Jsonb, paths: &[Register]) -> crate::Result<(Js Ok((result, ElementType::ARRAY)) } -fn json_string_to_db_type( +pub fn json_string_to_db_type( json: Jsonb, element_type: ElementType, flag: OutputVariant, diff --git a/core/lib.rs b/core/lib.rs index 7f9b8a04f..130503ede 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -443,7 +443,10 @@ fn get_schema_version(conn: &Arc) -> Result { #[derive(Debug, Clone, Eq, PartialEq)] pub enum CaptureDataChangesMode { Off, - RowidOnly { table: String }, + Id { table: String }, + Before { table: String }, + After { table: String }, + Full { table: String }, } impl CaptureDataChangesMode { @@ -453,22 +456,43 @@ impl CaptureDataChangesMode { .unwrap_or((value, TURSO_CDC_DEFAULT_TABLE_NAME)); match mode { "off" => Ok(CaptureDataChangesMode::Off), - "rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }), + "id" => Ok(CaptureDataChangesMode::Id { table: table.to_string() }), + "before" => Ok(CaptureDataChangesMode::Before { table: table.to_string() }), + "after" => Ok(CaptureDataChangesMode::After { table: table.to_string() }), + "full" => Ok(CaptureDataChangesMode::Full { table: table.to_string() }), _ => Err(LimboError::InvalidArgument( - "unexpected pragma value: expected '' or ',' parameter where mode is one of off|rowid-only".to_string(), + "unexpected pragma value: expected '' or ',' parameter where mode is one of off|id|before|after|full".to_string(), )) } } + pub fn has_after(&self) -> bool { + matches!( + self, + CaptureDataChangesMode::After { .. } | CaptureDataChangesMode::Full { .. } + ) + } + pub fn has_before(&self) -> bool { + matches!( + self, + CaptureDataChangesMode::Before { .. } | CaptureDataChangesMode::Full { .. } + ) + } pub fn mode_name(&self) -> &str { match self { CaptureDataChangesMode::Off => "off", - CaptureDataChangesMode::RowidOnly { .. } => "rowid-only", + CaptureDataChangesMode::Id { .. } => "id", + CaptureDataChangesMode::Before { .. } => "before", + CaptureDataChangesMode::After { .. } => "after", + CaptureDataChangesMode::Full { .. } => "full", } } pub fn table(&self) -> Option<&str> { match self { CaptureDataChangesMode::Off => None, - CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()), + CaptureDataChangesMode::Id { table } + | CaptureDataChangesMode::Before { table } + | CaptureDataChangesMode::After { table } + | CaptureDataChangesMode::Full { table } => Some(table.as_str()), } } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index b2b545d36..955be55d9 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1160,17 +1160,12 @@ pub fn read_value(buf: &[u8], serial_type: SerialType) -> Result<(RefValue, usiz content_size ); } - let slice = if content_size == 0 { - RawSlice::new(std::ptr::null(), 0) - } else { - let ptr = &buf[0] as *const u8; - RawSlice::new(ptr, content_size) - }; + Ok(( - RefValue::Text(TextRef { - value: slice, - subtype: TextSubtype::Text, - }), + RefValue::Text(TextRef::create_from( + &buf[..content_size], + TextSubtype::Text, + )), content_size, )) } diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 0b50c40a6..4169ba617 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -22,7 +22,7 @@ use super::select::emit_simple_count; use super::subquery::emit_subqueries; use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY; use crate::function::Func; -use crate::schema::Schema; +use crate::schema::{Schema, Table}; use crate::translate::compound_select::emit_program_for_compound_select; use crate::translate::plan::{DeletePlan, Plan, QueryDestination, Search}; use crate::translate::values::emit_values; @@ -149,7 +149,7 @@ pub struct TranslateCtx<'a> { /// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause. /// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`. pub non_aggregate_expressions: Vec<(&'a Expr, bool)>, - /// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data) + /// Cursor id for cdc table (if capture_data_changes PRAGMA is set and query can modify the data) pub cdc_cursor_id: Option, } @@ -570,18 +570,32 @@ fn emit_delete_insns( } } - if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id { + // Emit update in the CDC table if necessary (before DELETE updated the table) + if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { let rowid_reg = program.alloc_register(); program.emit_insn(Insn::RowId { cursor_id: main_table_cursor_id, dest: rowid_reg, }); + let cdc_has_before = program.capture_data_changes_mode().has_before(); + let before_record_reg = if cdc_has_before { + Some(emit_cdc_full_record( + program, + &table_reference.table, + main_table_cursor_id, + rowid_reg, + )) + } else { + None + }; emit_cdc_insns( program, &t_ctx.resolver, OperationMode::DELETE, - turso_cdc_cursor_id, + cdc_cursor_id, rowid_reg, + before_record_reg, + None, table_reference.table.get_name(), )?; } @@ -1097,52 +1111,36 @@ fn emit_update_insns( }); } - if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { - let rowid_reg = program.alloc_register(); + // create alias for CDC rowid after the change (will differ from cdc_rowid_before_reg only in case of UPDATE with change in rowid alias) + let cdc_rowid_after_reg = rowid_set_clause_reg.unwrap_or(beg); + + // create separate register with rowid before UPDATE for CDC + let cdc_rowid_before_reg = if t_ctx.cdc_cursor_id.is_some() { + let cdc_rowid_before_reg = program.alloc_register(); if has_user_provided_rowid { program.emit_insn(Insn::RowId { cursor_id, - dest: rowid_reg, + dest: cdc_rowid_before_reg, }); - emit_cdc_insns( - program, - &t_ctx.resolver, - OperationMode::DELETE, - cdc_cursor_id, - rowid_reg, - table_ref.table.get_name(), - )?; - program.emit_insn(Insn::Copy { - src_reg: rowid_set_clause_reg.expect( - "rowid_set_clause_reg must be set because has_user_provided_rowid is true", - ), - dst_reg: rowid_reg, - amount: 1, - }); - emit_cdc_insns( - program, - &t_ctx.resolver, - OperationMode::INSERT, - cdc_cursor_id, - rowid_reg, - table_ref.table.get_name(), - )?; + Some(cdc_rowid_before_reg) } else { - program.emit_insn(Insn::Copy { - src_reg: rowid_set_clause_reg.unwrap_or(beg), - dst_reg: rowid_reg, - amount: 1, - }); - emit_cdc_insns( - program, - &t_ctx.resolver, - OperationMode::UPDATE, - cdc_cursor_id, - rowid_reg, - table_ref.table.get_name(), - )?; + Some(cdc_rowid_after_reg) } - } + } else { + None + }; + + // create full CDC record before update if necessary + let cdc_before_reg = if program.capture_data_changes_mode().has_before() { + Some(emit_cdc_full_record( + program, + &table_ref.table, + cursor_id, + cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set"), + )) + } else { + None + }; // If we are updating the rowid, we cannot rely on overwrite on the // Insert instruction to update the cell. We need to first delete the current cell @@ -1158,6 +1156,58 @@ fn emit_update_insns( flag: InsertFlags::new().update(true), table_name: table_ref.identifier.clone(), }); + + // create full CDC record after update if necessary + let cdc_after_reg = if program.capture_data_changes_mode().has_after() { + Some(emit_cdc_patch_record( + program, + &table_ref.table, + start, + record_reg, + cdc_rowid_after_reg, + )) + } else { + None + }; + + // emit actual CDC instructions for write to the CDC table + if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { + let cdc_rowid_before_reg = + cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set"); + if has_user_provided_rowid { + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::DELETE, + cdc_cursor_id, + cdc_rowid_before_reg, + cdc_before_reg, + None, + table_ref.table.get_name(), + )?; + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::INSERT, + cdc_cursor_id, + cdc_rowid_after_reg, + cdc_after_reg, + None, + table_ref.table.get_name(), + )?; + } else { + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::UPDATE, + cdc_cursor_id, + cdc_rowid_before_reg, + cdc_before_reg, + cdc_after_reg, + table_ref.table.get_name(), + )?; + } + } } else if table_ref.virtual_table().is_some() { let arg_count = table_ref.columns().len() + 2; program.emit_insn(Insn::VUpdate { @@ -1183,16 +1233,75 @@ fn emit_update_insns( Ok(()) } +pub fn emit_cdc_patch_record( + program: &mut ProgramBuilder, + table: &Table, + columns_reg: usize, + record_reg: usize, + rowid_reg: usize, +) -> usize { + let columns = table.columns(); + let rowid_alias_position = columns.iter().position(|x| x.is_rowid_alias); + if let Some(rowid_alias_position) = rowid_alias_position { + let record_reg = program.alloc_register(); + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: columns_reg + rowid_alias_position, + amount: 0, + }); + program.emit_insn(Insn::MakeRecord { + start_reg: columns_reg, + count: table.columns().len(), + dest_reg: record_reg, + index_name: None, + }); + record_reg + } else { + record_reg + } +} + +pub fn emit_cdc_full_record( + program: &mut ProgramBuilder, + table: &Table, + table_cursor_id: usize, + rowid_reg: usize, +) -> usize { + let columns = table.columns(); + let columns_reg = program.alloc_registers(columns.len() + 1); + for (i, column) in columns.iter().enumerate() { + if column.is_rowid_alias { + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: columns_reg + 1 + i, + amount: 0, + }); + } else { + program.emit_column(table_cursor_id, i, columns_reg + 1 + i); + } + } + program.emit_insn(Insn::MakeRecord { + start_reg: columns_reg + 1, + count: columns.len(), + dest_reg: columns_reg, + index_name: None, + }); + columns_reg +} + +#[allow(clippy::too_many_arguments)] pub fn emit_cdc_insns( program: &mut ProgramBuilder, resolver: &Resolver, operation_mode: OperationMode, cdc_cursor_id: usize, rowid_reg: usize, + before_record_reg: Option, + after_record_reg: Option, table_name: &str, ) -> Result<()> { - // (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id) - let turso_cdc_registers = program.alloc_registers(5); + // (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id, before BLOB, after BLOB) + let turso_cdc_registers = program.alloc_registers(7); program.emit_insn(Insn::Null { dest: turso_cdc_registers, dest_end: None, @@ -1231,6 +1340,28 @@ pub fn emit_cdc_insns( amount: 0, }); + if let Some(before_record_reg) = before_record_reg { + program.emit_insn(Insn::Copy { + src_reg: before_record_reg, + dst_reg: turso_cdc_registers + 5, + amount: 0, + }); + } else { + program.emit_null(turso_cdc_registers + 5, None); + program.mark_last_insn_constant(); + } + + if let Some(after_record_reg) = after_record_reg { + program.emit_insn(Insn::Copy { + src_reg: after_record_reg, + dst_reg: turso_cdc_registers + 6, + amount: 0, + }); + } else { + program.emit_null(turso_cdc_registers + 6, None); + program.mark_last_insn_constant(); + } + let rowid_reg = program.alloc_register(); program.emit_insn(Insn::NewRowid { cursor: cdc_cursor_id, @@ -1241,7 +1372,7 @@ pub fn emit_cdc_insns( let record_reg = program.alloc_register(); program.emit_insn(Insn::MakeRecord { start_reg: turso_cdc_registers, - count: 5, + count: 7, dest_reg: record_reg, index_name: None, }); diff --git a/core/translate/expr.rs b/core/translate/expr.rs index ea36918b6..bc82c1cae 100644 --- a/core/translate/expr.rs +++ b/core/translate/expr.rs @@ -1730,13 +1730,64 @@ pub fn translate_expr( start_reg, resolver, )?; - program.emit_insn(Insn::Copy { src_reg: start_reg, dst_reg: target_register, amount: 0, }); - + Ok(target_register) + } + ScalarFunc::TableColumnsJsonArray => { + if args.is_none() || args.as_ref().unwrap().len() != 1 { + crate::bail_parse_error!( + "table_columns_json_array() function must have exactly 1 argument", + ); + } + let args = args.as_ref().unwrap(); + let start_reg = program.alloc_register(); + translate_expr( + program, + referenced_tables, + &args[0], + start_reg, + resolver, + )?; + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg, + dest: target_register, + func: func_ctx, + }); + Ok(target_register) + } + ScalarFunc::BinRecordJsonObject => { + if args.is_none() || args.as_ref().unwrap().len() != 2 { + crate::bail_parse_error!( + "bin_record_json_object() function must have exactly 2 arguments", + ); + } + let args = args.as_ref().unwrap(); + let start_reg = program.alloc_registers(2); + translate_expr( + program, + referenced_tables, + &args[0], + start_reg, + resolver, + )?; + translate_expr( + program, + referenced_tables, + &args[1], + start_reg + 1, + resolver, + )?; + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg, + dest: target_register, + func: func_ctx, + }); Ok(target_register) } } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 71127d4cb..070e7d3c8 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -6,7 +6,7 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; -use crate::translate::emitter::{emit_cdc_insns, OperationMode}; +use crate::translate::emitter::{emit_cdc_insns, emit_cdc_patch_record, OperationMode}; use crate::util::normalize_ident; use crate::vdbe::builder::ProgramBuilderOpts; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; @@ -444,18 +444,6 @@ pub fn translate_insert( _ => (), } - // Write record to the turso_cdc table if necessary - if let Some((cdc_cursor_id, _)) = &cdc_table { - emit_cdc_insns( - &mut program, - &resolver, - OperationMode::INSERT, - *cdc_cursor_id, - rowid_reg, - &table_name.0, - )?; - } - let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?; for index_col_mapping in index_col_mappings { // find which cursor we opened earlier for this index @@ -574,7 +562,6 @@ pub fn translate_insert( dest_reg: record_register, index_name: None, }); - program.emit_insn(Insn::Insert { cursor: cursor_id, key_reg: rowid_reg, @@ -583,6 +570,32 @@ pub fn translate_insert( table_name: table_name.to_string(), }); + // Emit update in the CDC table if necessary (after the INSERT updated the table) + if let Some((cdc_cursor_id, _)) = &cdc_table { + let cdc_has_after = program.capture_data_changes_mode().has_after(); + let after_record_reg = if cdc_has_after { + Some(emit_cdc_patch_record( + &mut program, + &table, + column_registers_start, + record_register, + rowid_reg, + )) + } else { + None + }; + emit_cdc_insns( + &mut program, + &resolver, + OperationMode::INSERT, + *cdc_cursor_id, + rowid_reg, + None, + after_record_reg, + &table_name.0, + )?; + } + if inserting_multiple_rows { if let Some(temp_table_ctx) = temp_table_ctx { program.emit_insn(Insn::Next { diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 0f8fb5b88..45f554fc9 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -510,5 +510,21 @@ fn turso_cdc_table_columns() -> Vec { col_type: None, constraints: vec![], }, + ast::ColumnDefinition { + col_name: ast::Name("before".to_string()), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("after".to_string()), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, ] } diff --git a/core/types.rs b/core/types.rs index 0d866bac3..df477cfdd 100644 --- a/core/types.rs +++ b/core/types.rs @@ -48,7 +48,7 @@ impl Display for ValueType { } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum TextSubtype { Text, @@ -127,6 +127,10 @@ impl Display for TextRef { } impl TextRef { + pub fn create_from(value: &[u8], subtype: TextSubtype) -> Self { + let value = RawSlice::create_from(value); + Self { value, subtype } + } pub fn as_str(&self) -> &str { unsafe { std::str::from_utf8_unchecked(self.value.to_slice()) } } @@ -855,19 +859,29 @@ impl ImmutableRecord { cursor.get_values(self).unwrap_or_default() } - pub fn from_registers<'a>( - registers: impl IntoIterator + Copy, + pub fn from_registers<'a, I: Iterator + Clone>( + // we need to accept both &[Register] and &[&Register] values - that's why non-trivial signature + // + // std::slice::Iter under the hood just stores pointer and length of slice and also implements a Clone which just copy those meta-values + // (without copying the data itself) + registers: impl IntoIterator, len: usize, ) -> Self { - let mut values = Vec::with_capacity(len); + Self::from_values(registers.into_iter().map(|x| x.get_owned_value()), len) + } + + pub fn from_values<'a>( + values: impl IntoIterator + Clone, + len: usize, + ) -> Self { + let mut ref_values = Vec::with_capacity(len); let mut serials = Vec::with_capacity(len); let mut size_header = 0; let mut size_values = 0; let mut serial_type_buf = [0; 9]; // write serial types - for value in registers { - let value = value.get_owned_value(); + for value in values.clone() { let serial_type = SerialType::from(value); let n = write_varint(&mut serial_type_buf[0..], serial_type.into()); serials.push((serial_type_buf, n)); @@ -916,15 +930,14 @@ impl ImmutableRecord { } // write content - for value in registers { - let value = value.get_owned_value(); + for value in values { let start_offset = writer.pos; match value { Value::Null => { - values.push(RefValue::Null); + ref_values.push(RefValue::Null); } Value::Integer(i) => { - values.push(RefValue::Integer(*i)); + ref_values.push(RefValue::Integer(*i)); let serial_type = SerialType::from(value); match serial_type.kind() { SerialTypeKind::ConstInt0 | SerialTypeKind::ConstInt1 => {} @@ -940,7 +953,7 @@ impl ImmutableRecord { } } Value::Float(f) => { - values.push(RefValue::Float(*f)); + ref_values.push(RefValue::Float(*f)); writer.extend_from_slice(&f.to_be_bytes()) } Value::Text(t) => { @@ -950,16 +963,16 @@ impl ImmutableRecord { let ptr = unsafe { writer.buf.as_ptr().add(start_offset) }; let value = RefValue::Text(TextRef { value: RawSlice::new(ptr, len), - subtype: t.subtype.clone(), + subtype: t.subtype, }); - values.push(value); + ref_values.push(value); } Value::Blob(b) => { writer.extend_from_slice(b); let end_offset = writer.pos; let len = end_offset - start_offset; let ptr = unsafe { writer.buf.as_ptr().add(start_offset) }; - values.push(RefValue::Blob(RawSlice::new(ptr, len))); + ref_values.push(RefValue::Blob(RawSlice::new(ptr, len))); } }; } @@ -1352,7 +1365,7 @@ impl RefValue { RefValue::Float(f) => Value::Float(*f), RefValue::Text(text_ref) => Value::Text(Text { value: text_ref.value.to_slice().to_vec(), - subtype: text_ref.subtype.clone(), + subtype: text_ref.subtype, }), RefValue::Blob(b) => Value::Blob(b.to_slice().to_vec()), } @@ -2337,6 +2350,14 @@ pub enum SeekKey<'a> { } impl RawSlice { + pub fn create_from(value: &[u8]) -> Self { + if value.is_empty() { + RawSlice::new(std::ptr::null(), 0) + } else { + let ptr = &value[0] as *const u8; + RawSlice::new(ptr, value.len()) + } + } pub fn new(data: *const u8, len: usize) -> Self { Self { data, len } } @@ -2405,7 +2426,7 @@ mod tests { Value::Float(f) => RefValue::Float(*f), Value::Text(text) => RefValue::Text(TextRef { value: RawSlice::from_slice(&text.value), - subtype: text.subtype.clone(), + subtype: text.subtype, }), Value::Blob(blob) => RefValue::Blob(RawSlice::from_slice(blob)), } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f7ab218a7..9079d23a0 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -79,8 +79,8 @@ use std::{cell::RefCell, collections::HashMap}; #[cfg(feature = "json")] use crate::{ - function::JsonFunc, json::convert_dbtype_to_raw_jsonb, json::get_json, json::is_json_valid, - json::json_array, json::json_array_length, json::json_arrow_extract, + function::JsonFunc, json, json::convert_dbtype_to_raw_jsonb, json::get_json, + json::is_json_valid, json::json_array, json::json_array_length, json::json_arrow_extract, json::json_arrow_shift_extract, json::json_error_position, json::json_extract, json::json_from_raw_bytes_agg, json::json_insert, json::json_object, json::json_patch, json::json_quote, json::json_remove, json::json_replace, json::json_set, json::json_type, @@ -1597,7 +1597,7 @@ pub fn op_column( if existing_text.value.capacity() >= new_text.value.len() { existing_text.value.clear(); existing_text.value.extend_from_slice(&new_text.value); - existing_text.subtype = new_text.subtype.clone(); + existing_text.subtype = new_text.subtype; } else { state.registers[*dest] = Register::Value(value); } @@ -4040,6 +4040,132 @@ pub fn op_function( .exec_likelihood(probability.get_owned_value()); state.registers[*dest] = Register::Value(result); } + ScalarFunc::TableColumnsJsonArray => { + assert_eq!(arg_count, 1); + #[cfg(not(feature = "json"))] + { + return Err(LimboError::InvalidArgument( + "table_columns_json_array: turso must be compiled with JSON support" + .to_string(), + )); + } + #[cfg(feature = "json")] + { + use crate::types::{TextRef, TextSubtype}; + + let table = state.registers[*start_reg].get_owned_value(); + let Value::Text(table) = table else { + return Err(LimboError::InvalidArgument( + "table_columns_json_array: function argument must be of type TEXT" + .to_string(), + )); + }; + let table = { + let schema = program.connection.schema.borrow(); + match schema.get_table(table.as_str()) { + Some(table) => table, + None => { + return Err(LimboError::InvalidArgument(format!( + "table_columns_json_array: table {} doesn't exists", + table + ))) + } + } + }; + + let mut json = json::jsonb::Jsonb::make_empty_array(table.columns().len() * 10); + for column in table.columns() { + let name = column.name.as_ref().unwrap(); + let name_json = json::convert_ref_dbtype_to_jsonb( + &RefValue::Text(TextRef::create_from( + name.as_str().as_bytes(), + TextSubtype::Text, + )), + json::Conv::ToString, + )?; + json.append_jsonb_to_end(name_json.data()); + } + json.finalize_unsafe(json::jsonb::ElementType::ARRAY)?; + state.registers[*dest] = Register::Value(json::json_string_to_db_type( + json, + json::jsonb::ElementType::ARRAY, + json::OutputVariant::String, + )?); + } + } + ScalarFunc::BinRecordJsonObject => { + assert_eq!(arg_count, 2); + #[cfg(not(feature = "json"))] + { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: turso must be compiled with JSON support" + .to_string(), + )); + } + #[cfg(feature = "json")] + 'outer: { + use crate::types::RecordCursor; + use std::str::FromStr; + + let columns_str = state.registers[*start_reg].get_owned_value(); + let bin_record = state.registers[*start_reg + 1].get_owned_value(); + let Value::Text(columns_str) = columns_str else { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: function arguments must be of type TEXT and BLOB correspondingly".to_string() + )); + }; + + if let Value::Null = bin_record { + state.registers[*dest] = Register::Value(Value::Null); + break 'outer; + } + + let Value::Blob(bin_record) = bin_record else { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: function arguments must be of type TEXT and BLOB correspondingly".to_string() + )); + }; + let mut columns_json_array = + json::jsonb::Jsonb::from_str(columns_str.as_str())?; + let columns_len = columns_json_array.array_len()?; + + let mut record = ImmutableRecord::new(bin_record.len()); + record.start_serialization(bin_record); + let mut record_cursor = RecordCursor::new(); + + let mut json = json::jsonb::Jsonb::make_empty_obj(columns_len); + for i in 0..columns_len { + let mut op = json::jsonb::SearchOperation::new(0); + let path = json::path::JsonPath { + elements: vec![ + json::path::PathElement::Root(), + json::path::PathElement::ArrayLocator(Some(i as i32)), + ], + }; + + columns_json_array.operate_on_path(&path, &mut op)?; + let column_name = op.result(); + json.append_jsonb_to_end(column_name.data()); + + let val = record_cursor.get_value(&record, i)?; + if let RefValue::Blob(..) = val { + return Err(LimboError::InvalidArgument( + "bin_record_json_object: formatting of BLOB values stored in binary record is not supported".to_string() + )); + } + let val_json = + json::convert_ref_dbtype_to_jsonb(&val, json::Conv::NotStrict)?; + json.append_jsonb_to_end(val_json.data()); + } + json.finalize_unsafe(json::jsonb::ElementType::OBJECT)?; + + state.registers[*dest] = Register::Value(json::json_string_to_db_type( + json, + json::jsonb::ElementType::OBJECT, + json::OutputVariant::String, + )?); + } + } }, crate::function::Func::Vector(vector_func) => match vector_func { VectorFunc::Vector => { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 776cbb9f8..56a90dcef 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -598,7 +598,7 @@ pub fn registers_to_ref_values(registers: &[Register]) -> Vec { Value::Float(f) => RefValue::Float(*f), Value::Text(t) => RefValue::Text(TextRef { value: RawSlice::new(t.value.as_ptr(), t.value.len()), - subtype: t.subtype.clone(), + subtype: t.subtype, }), Value::Blob(b) => RefValue::Blob(RawSlice::new(b.as_ptr(), b.len())), } diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs index e69751b68..1d683eefc 100644 --- a/tests/integration/functions/test_cdc.rs +++ b/tests/integration/functions/test_cdc.rs @@ -1,4 +1,5 @@ use rusqlite::types::Value; +use turso_core::types::ImmutableRecord; use crate::common::{limbo_exec_rows, TempDatabase}; @@ -14,10 +15,10 @@ fn replace_column_with_null(rows: Vec>, column: usize) -> Vec(values: [Value; N]) -> Vec { + let values = values + .into_iter() + .map(|x| match x { + Value::Null => turso_core::Value::Null, + Value::Integer(x) => turso_core::Value::Integer(x), + Value::Real(x) => turso_core::Value::Float(x), + Value::Text(x) => turso_core::Value::Text(turso_core::types::Text::new(&x)), + Value::Blob(x) => turso_core::Value::Blob(x), + }) + .collect::>(); + ImmutableRecord::from_values(&values, values.len()) + .get_payload() + .to_vec() +} + +#[test] +fn test_cdc_simple_before() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('before')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap(); + conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap(); + conn.execute("DELETE FROM t WHERE x = 3").unwrap(); + conn.execute("DELETE FROM t WHERE x = 1").unwrap(); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Null, + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Null, + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + Value::Null, + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), + Value::Null, + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), + Value::Null, + ] + ] + ); +} + +#[test] +fn test_cdc_simple_after() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('after')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap(); + conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap(); + conn.execute("DELETE FROM t WHERE x = 3").unwrap(); + conn.execute("DELETE FROM t WHERE x = 1").unwrap(); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Null, + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Null, + ] + ] + ); +} + +#[test] +fn test_cdc_simple_full() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('full')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap(); + conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap(); + conn.execute("DELETE FROM t WHERE x = 3").unwrap(); + conn.execute("DELETE FROM t WHERE x = 1").unwrap(); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Null, + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Null, + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Integer(1), Value::Integer(2)])), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(3), + Value::Blob(record([Value::Integer(3), Value::Integer(4)])), + Value::Null, + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1), + Value::Blob(record([Value::Integer(1), Value::Integer(3)])), + Value::Null, ] ] ); @@ -57,7 +276,7 @@ fn test_cdc_simple() { fn test_cdc_crud() { let db = TempDatabase::new_empty(false); let conn = db.connect_limbo(); - conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + conn.execute("PRAGMA unstable_capture_data_changes_conn('id')") .unwrap(); conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") .unwrap(); @@ -85,63 +304,81 @@ fn test_cdc_crud() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(20) + Value::Integer(20), + Value::Null, + Value::Null, ], vec![ Value::Integer(2), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(10) + Value::Integer(10), + Value::Null, + Value::Null, ], vec![ Value::Integer(3), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(5) + Value::Integer(5), + Value::Null, + Value::Null, ], vec![ Value::Integer(4), Value::Null, Value::Integer(0), Value::Text("t".to_string()), - Value::Integer(5) + Value::Integer(5), + Value::Null, + Value::Null, ], vec![ Value::Integer(5), Value::Null, Value::Integer(-1), Value::Text("t".to_string()), - Value::Integer(10) + Value::Integer(10), + Value::Null, + Value::Null, ], vec![ Value::Integer(6), Value::Null, Value::Integer(-1), Value::Text("t".to_string()), - Value::Integer(20) + Value::Integer(20), + Value::Null, + Value::Null, ], vec![ Value::Integer(7), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ], vec![ Value::Integer(8), Value::Null, Value::Integer(-1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ], vec![ Value::Integer(9), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], ] ); @@ -151,7 +388,7 @@ fn test_cdc_crud() { fn test_cdc_failed_op() { let db = TempDatabase::new_empty(true); let conn = db.connect_limbo(); - conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + conn.execute("PRAGMA unstable_capture_data_changes_conn('id')") .unwrap(); conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") .unwrap(); @@ -182,28 +419,36 @@ fn test_cdc_failed_op() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ], vec![ Value::Integer(2), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], vec![ Value::Integer(3), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(6) + Value::Integer(6), + Value::Null, + Value::Null, ], vec![ Value::Integer(4), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(7) + Value::Integer(7), + Value::Null, + Value::Null, ], ] ); @@ -218,13 +463,13 @@ fn test_cdc_uncaptured_connection() { .unwrap(); conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); conn1 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .execute("PRAGMA unstable_capture_data_changes_conn('id')") .unwrap(); conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured let conn2 = db.connect_limbo(); conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); conn2 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .execute("PRAGMA unstable_capture_data_changes_conn('id')") .unwrap(); conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured conn2 @@ -260,21 +505,27 @@ fn test_cdc_uncaptured_connection() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], vec![ Value::Integer(2), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(4) + Value::Integer(4), + Value::Null, + Value::Null, ], vec![ Value::Integer(3), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(6) + Value::Integer(6), + Value::Null, + Value::Null, ], ] ); @@ -288,7 +539,7 @@ fn test_cdc_custom_table() { .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") .unwrap(); conn1 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')") .unwrap(); conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); @@ -310,14 +561,18 @@ fn test_cdc_custom_table() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ], vec![ Value::Integer(2), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], ] ); @@ -331,7 +586,7 @@ fn test_cdc_ignore_changes_in_cdc_table() { .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") .unwrap(); conn1 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')") .unwrap(); conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); @@ -355,7 +610,9 @@ fn test_cdc_ignore_changes_in_cdc_table() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ],] ); } @@ -371,7 +628,7 @@ fn test_cdc_transaction() { .execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)") .unwrap(); conn1 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')") .unwrap(); conn1.execute("BEGIN").unwrap(); conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); @@ -394,35 +651,45 @@ fn test_cdc_transaction() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ], vec![ Value::Integer(2), Value::Null, Value::Integer(1), Value::Text("q".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], vec![ Value::Integer(3), Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(3) + Value::Integer(3), + Value::Null, + Value::Null, ], vec![ Value::Integer(4), Value::Null, Value::Integer(-1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ], vec![ Value::Integer(5), Value::Null, Value::Integer(0), Value::Text("q".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], ] ); @@ -434,10 +701,10 @@ fn test_cdc_independent_connections() { let conn1 = db.connect_limbo(); let conn2 = db.connect_limbo(); conn1 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc1')") .unwrap(); conn2 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc2')") .unwrap(); conn1 .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") @@ -461,7 +728,9 @@ fn test_cdc_independent_connections() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ]] ); let rows = @@ -473,7 +742,9 @@ fn test_cdc_independent_connections() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ]] ); } @@ -484,10 +755,10 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { let conn1 = db.connect_limbo(); let conn2 = db.connect_limbo(); conn1 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc1')") .unwrap(); conn2 - .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc2')") .unwrap(); conn1 .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") @@ -522,14 +793,18 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(2) + Value::Integer(2), + Value::Null, + Value::Null, ], vec![ Value::Integer(3), Value::Null, Value::Integer(-1), Value::Text("custom_cdc2".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ] ] ); @@ -543,15 +818,64 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() { Value::Null, Value::Integer(1), Value::Text("t".to_string()), - Value::Integer(4) + Value::Integer(4), + Value::Null, + Value::Null, ], vec![ Value::Integer(3), Value::Null, Value::Integer(-1), Value::Text("custom_cdc1".to_string()), - Value::Integer(1) + Value::Integer(1), + Value::Null, + Value::Null, ] ] ); } + +#[test] +fn test_cdc_table_columns() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + conn.execute("CREATE TABLE t(a INTEGER PRIMARY KEY, b, c UNIQUE)") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT table_columns_json_array('t')"); + assert_eq!( + rows, + vec![vec![Value::Text(r#"["a","b","c"]"#.to_string())]] + ); + conn.execute("ALTER TABLE t DROP COLUMN b").unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT table_columns_json_array('t')"); + assert_eq!(rows, vec![vec![Value::Text(r#"["a","c"]"#.to_string())]]); +} + +#[test] +fn test_cdc_bin_record() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + let record = record([ + Value::Null, + Value::Integer(1), + // use golden ratio instead of pi because clippy has weird rule that I can't use PI approximation written by hand + Value::Real(1.61803), + Value::Text("hello".to_string()), + ]); + let mut record_hex = String::new(); + for byte in record { + record_hex.push_str(&format!("{:02X}", byte)); + } + + let rows = limbo_exec_rows( + &db, + &conn, + &format!(r#"SELECT bin_record_json_object('["a","b","c","d"]', X'{record_hex}')"#), + ); + assert_eq!( + rows, + vec![vec![Value::Text( + r#"{"a":null,"b":1,"c":1.61803,"d":"hello"}"#.to_string() + )]] + ); +}