Merge 'CDC functions' from Nikita Sivukhin

This PR adds few functions to the `turso-db` in order to simplify
exploration of CDC table. Later we will also add API to work with
changes from the code - but SQL support is also useful.
So, this PR adds 2 functions:
1. `table_columns_json_array('<table-name>')` - returns list of current
table column **names** as a single string in JSON array format
2. `bin_record_json_object('<columns-array>', x'<bin-record>')` -
convert record in the SQLite format to the JSON object with keys from
`columns-array`
So, this functions can be used together to extract changes in human-
readable format:
```sql
turso> PRAGMA unstable_capture_data_changes_conn('full');
turso> CREATE TABLE t(a INTEGER PRIMARY KEY, b);
turso> INSERT INTO t VALUES (1, 2), (3, 4);
turso> UPDATE t SET b = 20 WHERE a = 1;
turso> UPDATE t SET a = 30, b = 40 WHERE a = 3;
turso> DELETE FROM t WHERE a = 1;
turso> SELECT
    bin_record_json_object(table_columns_json_array('t'), before) before,
    bin_record_json_object(table_columns_json_array('t'), after) after
    FROM turso_cdc;
┌─────────────────┬────────────────┐
│ before          │ after          │
├─────────────────┼────────────────┤
│                 │ {"a":1,"b":2}  │
├─────────────────┼────────────────┤
│                 │ {"a":3,"b":4}  │
├─────────────────┼────────────────┤
│ {"a":1,"b":2}   │ {"a":1,"b":20} │
├─────────────────┼────────────────┤
│ {"a":3,"b":4}   │                │
├─────────────────┼────────────────┤
│ {"a":30,"b":40} │                │
├─────────────────┼────────────────┤
│ {"a":1,"b":20}  │                │
└─────────────────┴────────────────┘
```
Initially, I thought to implement single function like
`bin_record_json_object('<table-name', x'<bin-record')` but this design
has certain flaws:
1. In case of schema changes this function can return incorrect result
(imagine that you dropped a column and now JSON from CDC mentions some
random subset of columns). While this feature is unstable - `turso-db`
should avoid silent incorrect behavior at all cost
2. Single-function design provide no way to deal with schema changes
3. The API is unsound and user can think that under the hood `turso-db`
will select proper schema for the record (but this is actually
impossible with current CDC implementation)
So, I decided to stop with two-functions design which cover drawbacks
mentioned above to some extent
1. First concern still remains valid
2. Two-functions design provides a way to deal with schema changes. For
example, user can maintain simple `cdc_schema_changes` table and log
result of `table_columns_json_array` before applying breaking schema
changes.
    * Obviously, this is not ideal UX - but it suits my needs: I don't
want to design schema changes capturing, but also I don't want to block
users and provide a way to have a workaround for scenarios which are not
natively supported by CDC
3. Subjectively, I think that API became a bit more clear about the
machinery of these two functions as user see that it extract column list
of the table (without any context) and then feed it to the
`bin_record_json_object` function.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2057
This commit is contained in:
Pekka Enberg
2025-07-14 11:54:17 +03:00
12 changed files with 880 additions and 156 deletions

View File

@@ -318,6 +318,8 @@ pub enum ScalarFunc {
Likely,
TimeDiff,
Likelihood,
TableColumnsJsonArray,
BinRecordJsonObject,
}
impl ScalarFunc {
@@ -375,6 +377,8 @@ impl ScalarFunc {
ScalarFunc::Likely => true,
ScalarFunc::TimeDiff => false,
ScalarFunc::Likelihood => true,
ScalarFunc::TableColumnsJsonArray => true, // while columns of the table can change with DDL statements, within single query plan it's static
ScalarFunc::BinRecordJsonObject => true,
}
}
}
@@ -434,6 +438,8 @@ impl Display for ScalarFunc {
Self::Likely => "likely".to_string(),
Self::TimeDiff => "timediff".to_string(),
Self::Likelihood => "likelihood".to_string(),
Self::TableColumnsJsonArray => "table_columns_json_array".to_string(),
Self::BinRecordJsonObject => "bin_record_json_object".to_string(),
};
write!(f, "{str}")
}
@@ -777,6 +783,8 @@ impl Func {
"unhex" => Ok(Self::Scalar(ScalarFunc::Unhex)),
"zeroblob" => Ok(Self::Scalar(ScalarFunc::ZeroBlob)),
"soundex" => Ok(Self::Scalar(ScalarFunc::Soundex)),
"table_columns_json_array" => Ok(Self::Scalar(ScalarFunc::TableColumnsJsonArray)),
"bin_record_json_object" => Ok(Self::Scalar(ScalarFunc::BinRecordJsonObject)),
"acos" => Ok(Self::Math(MathFunc::Acos)),
"acosh" => Ok(Self::Math(MathFunc::Acosh)),
"asin" => Ok(Self::Math(MathFunc::Asin)),

View File

@@ -1,8 +1,8 @@
mod cache;
mod error;
mod jsonb;
pub(crate) mod jsonb;
mod ops;
mod path;
pub(crate) mod path;
use crate::json::error::Error as JsonError;
pub use crate::json::ops::{
@@ -10,9 +10,9 @@ pub use crate::json::ops::{
jsonb_replace,
};
use crate::json::path::{json_path, JsonPath, PathElement};
use crate::types::{Text, TextSubtype, Value, ValueType};
use crate::types::{RawSlice, Text, TextRef, TextSubtype, Value, ValueType};
use crate::vdbe::Register;
use crate::{bail_constraint_error, bail_parse_error, LimboError};
use crate::{bail_constraint_error, bail_parse_error, LimboError, RefValue};
pub use cache::JsonCacheCell;
use jsonb::{ElementType, Jsonb, JsonbHeader, PathOperationMode, SearchOperation, SetOperation};
use std::borrow::Cow;
@@ -26,7 +26,7 @@ pub enum Conv {
}
#[cfg(feature = "json")]
enum OutputVariant {
pub enum OutputVariant {
ElementType,
Binary,
String,
@@ -100,9 +100,24 @@ pub fn json_from_raw_bytes_agg(data: &[u8], raw: bool) -> crate::Result<Value> {
}
}
fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result<Jsonb> {
pub fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result<Jsonb> {
convert_ref_dbtype_to_jsonb(
&match val {
Value::Null => RefValue::Null,
Value::Integer(x) => RefValue::Integer(*x),
Value::Float(x) => RefValue::Float(*x),
Value::Text(text) => {
RefValue::Text(TextRef::create_from(text.as_str().as_bytes(), text.subtype))
}
Value::Blob(items) => RefValue::Blob(RawSlice::create_from(items)),
},
strict,
)
}
pub fn convert_ref_dbtype_to_jsonb(val: &RefValue, strict: Conv) -> crate::Result<Jsonb> {
match val {
Value::Text(text) => {
RefValue::Text(text) => {
let res = if text.subtype == TextSubtype::Json || matches!(strict, Conv::Strict) {
// Parse directly as JSON if it's already JSON subtype or strict mode is on
let json = if matches!(strict, Conv::ToString) {
@@ -125,20 +140,20 @@ fn convert_dbtype_to_jsonb(val: &Value, strict: Conv) -> crate::Result<Jsonb> {
};
res.map_err(|_| LimboError::ParseError("malformed JSON".to_string()))
}
Value::Blob(blob) => {
let json = Jsonb::from_raw_data(blob);
RefValue::Blob(blob) => {
let json = Jsonb::from_raw_data(blob.to_slice());
json.is_valid()?;
Ok(json)
}
Value::Null => Ok(Jsonb::from_raw_data(
RefValue::Null => Ok(Jsonb::from_raw_data(
JsonbHeader::make_null().into_bytes().as_bytes(),
)),
Value::Float(float) => {
RefValue::Float(float) => {
let mut buff = ryu::Buffer::new();
Jsonb::from_str(buff.format(*float))
.map_err(|_| LimboError::ParseError("malformed JSON".to_string()))
}
Value::Integer(int) => Jsonb::from_str(&int.to_string())
RefValue::Integer(int) => Jsonb::from_str(&int.to_string())
.map_err(|_| LimboError::ParseError("malformed JSON".to_string())),
}
}
@@ -405,7 +420,7 @@ fn jsonb_extract_internal(value: Jsonb, paths: &[Register]) -> crate::Result<(Js
Ok((result, ElementType::ARRAY))
}
fn json_string_to_db_type(
pub fn json_string_to_db_type(
json: Jsonb,
element_type: ElementType,
flag: OutputVariant,

View File

@@ -443,7 +443,10 @@ fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CaptureDataChangesMode {
Off,
RowidOnly { table: String },
Id { table: String },
Before { table: String },
After { table: String },
Full { table: String },
}
impl CaptureDataChangesMode {
@@ -453,22 +456,43 @@ impl CaptureDataChangesMode {
.unwrap_or((value, TURSO_CDC_DEFAULT_TABLE_NAME));
match mode {
"off" => Ok(CaptureDataChangesMode::Off),
"rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }),
"id" => Ok(CaptureDataChangesMode::Id { table: table.to_string() }),
"before" => Ok(CaptureDataChangesMode::Before { table: table.to_string() }),
"after" => Ok(CaptureDataChangesMode::After { table: table.to_string() }),
"full" => Ok(CaptureDataChangesMode::Full { table: table.to_string() }),
_ => Err(LimboError::InvalidArgument(
"unexpected pragma value: expected '<mode>' or '<mode>,<cdc-table-name>' parameter where mode is one of off|rowid-only".to_string(),
"unexpected pragma value: expected '<mode>' or '<mode>,<cdc-table-name>' parameter where mode is one of off|id|before|after|full".to_string(),
))
}
}
pub fn has_after(&self) -> bool {
matches!(
self,
CaptureDataChangesMode::After { .. } | CaptureDataChangesMode::Full { .. }
)
}
pub fn has_before(&self) -> bool {
matches!(
self,
CaptureDataChangesMode::Before { .. } | CaptureDataChangesMode::Full { .. }
)
}
pub fn mode_name(&self) -> &str {
match self {
CaptureDataChangesMode::Off => "off",
CaptureDataChangesMode::RowidOnly { .. } => "rowid-only",
CaptureDataChangesMode::Id { .. } => "id",
CaptureDataChangesMode::Before { .. } => "before",
CaptureDataChangesMode::After { .. } => "after",
CaptureDataChangesMode::Full { .. } => "full",
}
}
pub fn table(&self) -> Option<&str> {
match self {
CaptureDataChangesMode::Off => None,
CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()),
CaptureDataChangesMode::Id { table }
| CaptureDataChangesMode::Before { table }
| CaptureDataChangesMode::After { table }
| CaptureDataChangesMode::Full { table } => Some(table.as_str()),
}
}
}

View File

@@ -1160,17 +1160,12 @@ pub fn read_value(buf: &[u8], serial_type: SerialType) -> Result<(RefValue, usiz
content_size
);
}
let slice = if content_size == 0 {
RawSlice::new(std::ptr::null(), 0)
} else {
let ptr = &buf[0] as *const u8;
RawSlice::new(ptr, content_size)
};
Ok((
RefValue::Text(TextRef {
value: slice,
subtype: TextSubtype::Text,
}),
RefValue::Text(TextRef::create_from(
&buf[..content_size],
TextSubtype::Text,
)),
content_size,
))
}

View File

@@ -22,7 +22,7 @@ use super::select::emit_simple_count;
use super::subquery::emit_subqueries;
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::function::Func;
use crate::schema::Schema;
use crate::schema::{Schema, Table};
use crate::translate::compound_select::emit_program_for_compound_select;
use crate::translate::plan::{DeletePlan, Plan, QueryDestination, Search};
use crate::translate::values::emit_values;
@@ -149,7 +149,7 @@ pub struct TranslateCtx<'a> {
/// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause.
/// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`.
pub non_aggregate_expressions: Vec<(&'a Expr, bool)>,
/// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data)
/// Cursor id for cdc table (if capture_data_changes PRAGMA is set and query can modify the data)
pub cdc_cursor_id: Option<usize>,
}
@@ -570,18 +570,32 @@ fn emit_delete_insns(
}
}
if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id {
// Emit update in the CDC table if necessary (before DELETE updated the table)
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id: main_table_cursor_id,
dest: rowid_reg,
});
let cdc_has_before = program.capture_data_changes_mode().has_before();
let before_record_reg = if cdc_has_before {
Some(emit_cdc_full_record(
program,
&table_reference.table,
main_table_cursor_id,
rowid_reg,
))
} else {
None
};
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::DELETE,
turso_cdc_cursor_id,
cdc_cursor_id,
rowid_reg,
before_record_reg,
None,
table_reference.table.get_name(),
)?;
}
@@ -1097,52 +1111,36 @@ fn emit_update_insns(
});
}
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
let rowid_reg = program.alloc_register();
// create alias for CDC rowid after the change (will differ from cdc_rowid_before_reg only in case of UPDATE with change in rowid alias)
let cdc_rowid_after_reg = rowid_set_clause_reg.unwrap_or(beg);
// create separate register with rowid before UPDATE for CDC
let cdc_rowid_before_reg = if t_ctx.cdc_cursor_id.is_some() {
let cdc_rowid_before_reg = program.alloc_register();
if has_user_provided_rowid {
program.emit_insn(Insn::RowId {
cursor_id,
dest: rowid_reg,
dest: cdc_rowid_before_reg,
});
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::DELETE,
cdc_cursor_id,
rowid_reg,
table_ref.table.get_name(),
)?;
program.emit_insn(Insn::Copy {
src_reg: rowid_set_clause_reg.expect(
"rowid_set_clause_reg must be set because has_user_provided_rowid is true",
),
dst_reg: rowid_reg,
amount: 1,
});
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::INSERT,
cdc_cursor_id,
rowid_reg,
table_ref.table.get_name(),
)?;
Some(cdc_rowid_before_reg)
} else {
program.emit_insn(Insn::Copy {
src_reg: rowid_set_clause_reg.unwrap_or(beg),
dst_reg: rowid_reg,
amount: 1,
});
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::UPDATE,
cdc_cursor_id,
rowid_reg,
table_ref.table.get_name(),
)?;
Some(cdc_rowid_after_reg)
}
}
} else {
None
};
// create full CDC record before update if necessary
let cdc_before_reg = if program.capture_data_changes_mode().has_before() {
Some(emit_cdc_full_record(
program,
&table_ref.table,
cursor_id,
cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set"),
))
} else {
None
};
// If we are updating the rowid, we cannot rely on overwrite on the
// Insert instruction to update the cell. We need to first delete the current cell
@@ -1158,6 +1156,58 @@ fn emit_update_insns(
flag: InsertFlags::new().update(true),
table_name: table_ref.identifier.clone(),
});
// create full CDC record after update if necessary
let cdc_after_reg = if program.capture_data_changes_mode().has_after() {
Some(emit_cdc_patch_record(
program,
&table_ref.table,
start,
record_reg,
cdc_rowid_after_reg,
))
} else {
None
};
// emit actual CDC instructions for write to the CDC table
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
let cdc_rowid_before_reg =
cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set");
if has_user_provided_rowid {
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::DELETE,
cdc_cursor_id,
cdc_rowid_before_reg,
cdc_before_reg,
None,
table_ref.table.get_name(),
)?;
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::INSERT,
cdc_cursor_id,
cdc_rowid_after_reg,
cdc_after_reg,
None,
table_ref.table.get_name(),
)?;
} else {
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::UPDATE,
cdc_cursor_id,
cdc_rowid_before_reg,
cdc_before_reg,
cdc_after_reg,
table_ref.table.get_name(),
)?;
}
}
} else if table_ref.virtual_table().is_some() {
let arg_count = table_ref.columns().len() + 2;
program.emit_insn(Insn::VUpdate {
@@ -1183,16 +1233,75 @@ fn emit_update_insns(
Ok(())
}
pub fn emit_cdc_patch_record(
program: &mut ProgramBuilder,
table: &Table,
columns_reg: usize,
record_reg: usize,
rowid_reg: usize,
) -> usize {
let columns = table.columns();
let rowid_alias_position = columns.iter().position(|x| x.is_rowid_alias);
if let Some(rowid_alias_position) = rowid_alias_position {
let record_reg = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: rowid_reg,
dst_reg: columns_reg + rowid_alias_position,
amount: 0,
});
program.emit_insn(Insn::MakeRecord {
start_reg: columns_reg,
count: table.columns().len(),
dest_reg: record_reg,
index_name: None,
});
record_reg
} else {
record_reg
}
}
pub fn emit_cdc_full_record(
program: &mut ProgramBuilder,
table: &Table,
table_cursor_id: usize,
rowid_reg: usize,
) -> usize {
let columns = table.columns();
let columns_reg = program.alloc_registers(columns.len() + 1);
for (i, column) in columns.iter().enumerate() {
if column.is_rowid_alias {
program.emit_insn(Insn::Copy {
src_reg: rowid_reg,
dst_reg: columns_reg + 1 + i,
amount: 0,
});
} else {
program.emit_column(table_cursor_id, i, columns_reg + 1 + i);
}
}
program.emit_insn(Insn::MakeRecord {
start_reg: columns_reg + 1,
count: columns.len(),
dest_reg: columns_reg,
index_name: None,
});
columns_reg
}
#[allow(clippy::too_many_arguments)]
pub fn emit_cdc_insns(
program: &mut ProgramBuilder,
resolver: &Resolver,
operation_mode: OperationMode,
cdc_cursor_id: usize,
rowid_reg: usize,
before_record_reg: Option<usize>,
after_record_reg: Option<usize>,
table_name: &str,
) -> Result<()> {
// (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id)
let turso_cdc_registers = program.alloc_registers(5);
// (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id, before BLOB, after BLOB)
let turso_cdc_registers = program.alloc_registers(7);
program.emit_insn(Insn::Null {
dest: turso_cdc_registers,
dest_end: None,
@@ -1231,6 +1340,28 @@ pub fn emit_cdc_insns(
amount: 0,
});
if let Some(before_record_reg) = before_record_reg {
program.emit_insn(Insn::Copy {
src_reg: before_record_reg,
dst_reg: turso_cdc_registers + 5,
amount: 0,
});
} else {
program.emit_null(turso_cdc_registers + 5, None);
program.mark_last_insn_constant();
}
if let Some(after_record_reg) = after_record_reg {
program.emit_insn(Insn::Copy {
src_reg: after_record_reg,
dst_reg: turso_cdc_registers + 6,
amount: 0,
});
} else {
program.emit_null(turso_cdc_registers + 6, None);
program.mark_last_insn_constant();
}
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: cdc_cursor_id,
@@ -1241,7 +1372,7 @@ pub fn emit_cdc_insns(
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: turso_cdc_registers,
count: 5,
count: 7,
dest_reg: record_reg,
index_name: None,
});

View File

@@ -1730,13 +1730,64 @@ pub fn translate_expr(
start_reg,
resolver,
)?;
program.emit_insn(Insn::Copy {
src_reg: start_reg,
dst_reg: target_register,
amount: 0,
});
Ok(target_register)
}
ScalarFunc::TableColumnsJsonArray => {
if args.is_none() || args.as_ref().unwrap().len() != 1 {
crate::bail_parse_error!(
"table_columns_json_array() function must have exactly 1 argument",
);
}
let args = args.as_ref().unwrap();
let start_reg = program.alloc_register();
translate_expr(
program,
referenced_tables,
&args[0],
start_reg,
resolver,
)?;
program.emit_insn(Insn::Function {
constant_mask: 0,
start_reg,
dest: target_register,
func: func_ctx,
});
Ok(target_register)
}
ScalarFunc::BinRecordJsonObject => {
if args.is_none() || args.as_ref().unwrap().len() != 2 {
crate::bail_parse_error!(
"bin_record_json_object() function must have exactly 2 arguments",
);
}
let args = args.as_ref().unwrap();
let start_reg = program.alloc_registers(2);
translate_expr(
program,
referenced_tables,
&args[0],
start_reg,
resolver,
)?;
translate_expr(
program,
referenced_tables,
&args[1],
start_reg + 1,
resolver,
)?;
program.emit_insn(Insn::Function {
constant_mask: 0,
start_reg,
dest: target_register,
func: func_ctx,
});
Ok(target_register)
}
}

View File

@@ -6,7 +6,7 @@ use turso_sqlite3_parser::ast::{
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
use crate::schema::{IndexColumn, Table};
use crate::translate::emitter::{emit_cdc_insns, OperationMode};
use crate::translate::emitter::{emit_cdc_insns, emit_cdc_patch_record, OperationMode};
use crate::util::normalize_ident;
use crate::vdbe::builder::ProgramBuilderOpts;
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
@@ -444,18 +444,6 @@ pub fn translate_insert(
_ => (),
}
// Write record to the turso_cdc table if necessary
if let Some((cdc_cursor_id, _)) = &cdc_table {
emit_cdc_insns(
&mut program,
&resolver,
OperationMode::INSERT,
*cdc_cursor_id,
rowid_reg,
&table_name.0,
)?;
}
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
for index_col_mapping in index_col_mappings {
// find which cursor we opened earlier for this index
@@ -574,7 +562,6 @@ pub fn translate_insert(
dest_reg: record_register,
index_name: None,
});
program.emit_insn(Insn::Insert {
cursor: cursor_id,
key_reg: rowid_reg,
@@ -583,6 +570,32 @@ pub fn translate_insert(
table_name: table_name.to_string(),
});
// Emit update in the CDC table if necessary (after the INSERT updated the table)
if let Some((cdc_cursor_id, _)) = &cdc_table {
let cdc_has_after = program.capture_data_changes_mode().has_after();
let after_record_reg = if cdc_has_after {
Some(emit_cdc_patch_record(
&mut program,
&table,
column_registers_start,
record_register,
rowid_reg,
))
} else {
None
};
emit_cdc_insns(
&mut program,
&resolver,
OperationMode::INSERT,
*cdc_cursor_id,
rowid_reg,
None,
after_record_reg,
&table_name.0,
)?;
}
if inserting_multiple_rows {
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Next {

View File

@@ -510,5 +510,21 @@ fn turso_cdc_table_columns() -> Vec<ColumnDefinition> {
col_type: None,
constraints: vec![],
},
ast::ColumnDefinition {
col_name: ast::Name("before".to_string()),
col_type: Some(ast::Type {
name: "BLOB".to_string(),
size: None,
}),
constraints: vec![],
},
ast::ColumnDefinition {
col_name: ast::Name("after".to_string()),
col_type: Some(ast::Type {
name: "BLOB".to_string(),
size: None,
}),
constraints: vec![],
},
]
}

View File

@@ -48,7 +48,7 @@ impl Display for ValueType {
}
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum TextSubtype {
Text,
@@ -127,6 +127,10 @@ impl Display for TextRef {
}
impl TextRef {
pub fn create_from(value: &[u8], subtype: TextSubtype) -> Self {
let value = RawSlice::create_from(value);
Self { value, subtype }
}
pub fn as_str(&self) -> &str {
unsafe { std::str::from_utf8_unchecked(self.value.to_slice()) }
}
@@ -855,19 +859,29 @@ impl ImmutableRecord {
cursor.get_values(self).unwrap_or_default()
}
pub fn from_registers<'a>(
registers: impl IntoIterator<Item = &'a Register> + Copy,
pub fn from_registers<'a, I: Iterator<Item = &'a Register> + Clone>(
// we need to accept both &[Register] and &[&Register] values - that's why non-trivial signature
//
// std::slice::Iter under the hood just stores pointer and length of slice and also implements a Clone which just copy those meta-values
// (without copying the data itself)
registers: impl IntoIterator<Item = &'a Register, IntoIter = I>,
len: usize,
) -> Self {
let mut values = Vec::with_capacity(len);
Self::from_values(registers.into_iter().map(|x| x.get_owned_value()), len)
}
pub fn from_values<'a>(
values: impl IntoIterator<Item = &'a Value> + Clone,
len: usize,
) -> Self {
let mut ref_values = Vec::with_capacity(len);
let mut serials = Vec::with_capacity(len);
let mut size_header = 0;
let mut size_values = 0;
let mut serial_type_buf = [0; 9];
// write serial types
for value in registers {
let value = value.get_owned_value();
for value in values.clone() {
let serial_type = SerialType::from(value);
let n = write_varint(&mut serial_type_buf[0..], serial_type.into());
serials.push((serial_type_buf, n));
@@ -916,15 +930,14 @@ impl ImmutableRecord {
}
// write content
for value in registers {
let value = value.get_owned_value();
for value in values {
let start_offset = writer.pos;
match value {
Value::Null => {
values.push(RefValue::Null);
ref_values.push(RefValue::Null);
}
Value::Integer(i) => {
values.push(RefValue::Integer(*i));
ref_values.push(RefValue::Integer(*i));
let serial_type = SerialType::from(value);
match serial_type.kind() {
SerialTypeKind::ConstInt0 | SerialTypeKind::ConstInt1 => {}
@@ -940,7 +953,7 @@ impl ImmutableRecord {
}
}
Value::Float(f) => {
values.push(RefValue::Float(*f));
ref_values.push(RefValue::Float(*f));
writer.extend_from_slice(&f.to_be_bytes())
}
Value::Text(t) => {
@@ -950,16 +963,16 @@ impl ImmutableRecord {
let ptr = unsafe { writer.buf.as_ptr().add(start_offset) };
let value = RefValue::Text(TextRef {
value: RawSlice::new(ptr, len),
subtype: t.subtype.clone(),
subtype: t.subtype,
});
values.push(value);
ref_values.push(value);
}
Value::Blob(b) => {
writer.extend_from_slice(b);
let end_offset = writer.pos;
let len = end_offset - start_offset;
let ptr = unsafe { writer.buf.as_ptr().add(start_offset) };
values.push(RefValue::Blob(RawSlice::new(ptr, len)));
ref_values.push(RefValue::Blob(RawSlice::new(ptr, len)));
}
};
}
@@ -1352,7 +1365,7 @@ impl RefValue {
RefValue::Float(f) => Value::Float(*f),
RefValue::Text(text_ref) => Value::Text(Text {
value: text_ref.value.to_slice().to_vec(),
subtype: text_ref.subtype.clone(),
subtype: text_ref.subtype,
}),
RefValue::Blob(b) => Value::Blob(b.to_slice().to_vec()),
}
@@ -2337,6 +2350,14 @@ pub enum SeekKey<'a> {
}
impl RawSlice {
pub fn create_from(value: &[u8]) -> Self {
if value.is_empty() {
RawSlice::new(std::ptr::null(), 0)
} else {
let ptr = &value[0] as *const u8;
RawSlice::new(ptr, value.len())
}
}
pub fn new(data: *const u8, len: usize) -> Self {
Self { data, len }
}
@@ -2405,7 +2426,7 @@ mod tests {
Value::Float(f) => RefValue::Float(*f),
Value::Text(text) => RefValue::Text(TextRef {
value: RawSlice::from_slice(&text.value),
subtype: text.subtype.clone(),
subtype: text.subtype,
}),
Value::Blob(blob) => RefValue::Blob(RawSlice::from_slice(blob)),
}

View File

@@ -79,8 +79,8 @@ use std::{cell::RefCell, collections::HashMap};
#[cfg(feature = "json")]
use crate::{
function::JsonFunc, json::convert_dbtype_to_raw_jsonb, json::get_json, json::is_json_valid,
json::json_array, json::json_array_length, json::json_arrow_extract,
function::JsonFunc, json, json::convert_dbtype_to_raw_jsonb, json::get_json,
json::is_json_valid, json::json_array, json::json_array_length, json::json_arrow_extract,
json::json_arrow_shift_extract, json::json_error_position, json::json_extract,
json::json_from_raw_bytes_agg, json::json_insert, json::json_object, json::json_patch,
json::json_quote, json::json_remove, json::json_replace, json::json_set, json::json_type,
@@ -1597,7 +1597,7 @@ pub fn op_column(
if existing_text.value.capacity() >= new_text.value.len() {
existing_text.value.clear();
existing_text.value.extend_from_slice(&new_text.value);
existing_text.subtype = new_text.subtype.clone();
existing_text.subtype = new_text.subtype;
} else {
state.registers[*dest] = Register::Value(value);
}
@@ -4040,6 +4040,132 @@ pub fn op_function(
.exec_likelihood(probability.get_owned_value());
state.registers[*dest] = Register::Value(result);
}
ScalarFunc::TableColumnsJsonArray => {
assert_eq!(arg_count, 1);
#[cfg(not(feature = "json"))]
{
return Err(LimboError::InvalidArgument(
"table_columns_json_array: turso must be compiled with JSON support"
.to_string(),
));
}
#[cfg(feature = "json")]
{
use crate::types::{TextRef, TextSubtype};
let table = state.registers[*start_reg].get_owned_value();
let Value::Text(table) = table else {
return Err(LimboError::InvalidArgument(
"table_columns_json_array: function argument must be of type TEXT"
.to_string(),
));
};
let table = {
let schema = program.connection.schema.borrow();
match schema.get_table(table.as_str()) {
Some(table) => table,
None => {
return Err(LimboError::InvalidArgument(format!(
"table_columns_json_array: table {} doesn't exists",
table
)))
}
}
};
let mut json = json::jsonb::Jsonb::make_empty_array(table.columns().len() * 10);
for column in table.columns() {
let name = column.name.as_ref().unwrap();
let name_json = json::convert_ref_dbtype_to_jsonb(
&RefValue::Text(TextRef::create_from(
name.as_str().as_bytes(),
TextSubtype::Text,
)),
json::Conv::ToString,
)?;
json.append_jsonb_to_end(name_json.data());
}
json.finalize_unsafe(json::jsonb::ElementType::ARRAY)?;
state.registers[*dest] = Register::Value(json::json_string_to_db_type(
json,
json::jsonb::ElementType::ARRAY,
json::OutputVariant::String,
)?);
}
}
ScalarFunc::BinRecordJsonObject => {
assert_eq!(arg_count, 2);
#[cfg(not(feature = "json"))]
{
return Err(LimboError::InvalidArgument(
"bin_record_json_object: turso must be compiled with JSON support"
.to_string(),
));
}
#[cfg(feature = "json")]
'outer: {
use crate::types::RecordCursor;
use std::str::FromStr;
let columns_str = state.registers[*start_reg].get_owned_value();
let bin_record = state.registers[*start_reg + 1].get_owned_value();
let Value::Text(columns_str) = columns_str else {
return Err(LimboError::InvalidArgument(
"bin_record_json_object: function arguments must be of type TEXT and BLOB correspondingly".to_string()
));
};
if let Value::Null = bin_record {
state.registers[*dest] = Register::Value(Value::Null);
break 'outer;
}
let Value::Blob(bin_record) = bin_record else {
return Err(LimboError::InvalidArgument(
"bin_record_json_object: function arguments must be of type TEXT and BLOB correspondingly".to_string()
));
};
let mut columns_json_array =
json::jsonb::Jsonb::from_str(columns_str.as_str())?;
let columns_len = columns_json_array.array_len()?;
let mut record = ImmutableRecord::new(bin_record.len());
record.start_serialization(bin_record);
let mut record_cursor = RecordCursor::new();
let mut json = json::jsonb::Jsonb::make_empty_obj(columns_len);
for i in 0..columns_len {
let mut op = json::jsonb::SearchOperation::new(0);
let path = json::path::JsonPath {
elements: vec![
json::path::PathElement::Root(),
json::path::PathElement::ArrayLocator(Some(i as i32)),
],
};
columns_json_array.operate_on_path(&path, &mut op)?;
let column_name = op.result();
json.append_jsonb_to_end(column_name.data());
let val = record_cursor.get_value(&record, i)?;
if let RefValue::Blob(..) = val {
return Err(LimboError::InvalidArgument(
"bin_record_json_object: formatting of BLOB values stored in binary record is not supported".to_string()
));
}
let val_json =
json::convert_ref_dbtype_to_jsonb(&val, json::Conv::NotStrict)?;
json.append_jsonb_to_end(val_json.data());
}
json.finalize_unsafe(json::jsonb::ElementType::OBJECT)?;
state.registers[*dest] = Register::Value(json::json_string_to_db_type(
json,
json::jsonb::ElementType::OBJECT,
json::OutputVariant::String,
)?);
}
}
},
crate::function::Func::Vector(vector_func) => match vector_func {
VectorFunc::Vector => {

View File

@@ -598,7 +598,7 @@ pub fn registers_to_ref_values(registers: &[Register]) -> Vec<RefValue> {
Value::Float(f) => RefValue::Float(*f),
Value::Text(t) => RefValue::Text(TextRef {
value: RawSlice::new(t.value.as_ptr(), t.value.len()),
subtype: t.subtype.clone(),
subtype: t.subtype,
}),
Value::Blob(b) => RefValue::Blob(RawSlice::new(b.as_ptr(), b.len())),
}

View File

@@ -1,4 +1,5 @@
use rusqlite::types::Value;
use turso_core::types::ImmutableRecord;
use crate::common::{limbo_exec_rows, TempDatabase};
@@ -14,10 +15,10 @@ fn replace_column_with_null(rows: Vec<Vec<Value>>, column: usize) -> Vec<Vec<Val
}
#[test]
fn test_cdc_simple() {
fn test_cdc_simple_id() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
conn.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
@@ -40,14 +41,232 @@ fn test_cdc_simple() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(10)
Value::Integer(10),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(5)
Value::Integer(5),
Value::Null,
Value::Null,
]
]
);
}
fn record<const N: usize>(values: [Value; N]) -> Vec<u8> {
let values = values
.into_iter()
.map(|x| match x {
Value::Null => turso_core::Value::Null,
Value::Integer(x) => turso_core::Value::Integer(x),
Value::Real(x) => turso_core::Value::Float(x),
Value::Text(x) => turso_core::Value::Text(turso_core::types::Text::new(&x)),
Value::Blob(x) => turso_core::Value::Blob(x),
})
.collect::<Vec<_>>();
ImmutableRecord::from_values(&values, values.len())
.get_payload()
.to_vec()
}
#[test]
fn test_cdc_simple_before() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('before')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap();
conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap();
conn.execute("DELETE FROM t WHERE x = 3").unwrap();
conn.execute("DELETE FROM t WHERE x = 1").unwrap();
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1);
assert_eq!(
rows,
vec![
vec![
Value::Integer(1),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(3),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(0),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(3),
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
Value::Null,
]
]
);
}
#[test]
fn test_cdc_simple_after() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('after')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap();
conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap();
conn.execute("DELETE FROM t WHERE x = 3").unwrap();
conn.execute("DELETE FROM t WHERE x = 1").unwrap();
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1);
assert_eq!(
rows,
vec![
vec![
Value::Integer(1),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Null,
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(3),
Value::Null,
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(0),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Null,
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(3),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Null,
Value::Null,
]
]
);
}
#[test]
fn test_cdc_simple_full() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('full')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn.execute("INSERT INTO t VALUES (1, 2), (3, 4)").unwrap();
conn.execute("UPDATE t SET y = 3 WHERE x = 1").unwrap();
conn.execute("DELETE FROM t WHERE x = 3").unwrap();
conn.execute("DELETE FROM t WHERE x = 1").unwrap();
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1);
assert_eq!(
rows,
vec![
vec![
Value::Integer(1),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Null,
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(3),
Value::Null,
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(0),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(3),
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
Value::Null,
]
]
);
@@ -57,7 +276,7 @@ fn test_cdc_simple() {
fn test_cdc_crud() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
conn.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
@@ -85,63 +304,81 @@ fn test_cdc_crud() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(20)
Value::Integer(20),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(10)
Value::Integer(10),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(5)
Value::Integer(5),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(0),
Value::Text("t".to_string()),
Value::Integer(5)
Value::Integer(5),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(10)
Value::Integer(10),
Value::Null,
Value::Null,
],
vec![
Value::Integer(6),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(20)
Value::Integer(20),
Value::Null,
Value::Null,
],
vec![
Value::Integer(7),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(8),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(9),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
]
);
@@ -151,7 +388,7 @@ fn test_cdc_crud() {
fn test_cdc_failed_op() {
let db = TempDatabase::new_empty(true);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
conn.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
@@ -182,28 +419,36 @@ fn test_cdc_failed_op() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(6)
Value::Integer(6),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(7)
Value::Integer(7),
Value::Null,
Value::Null,
],
]
);
@@ -218,13 +463,13 @@ fn test_cdc_uncaptured_connection() {
.unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured
let conn2 = db.connect_limbo();
conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap();
conn2
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured
conn2
@@ -260,21 +505,27 @@ fn test_cdc_uncaptured_connection() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(4)
Value::Integer(4),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(6)
Value::Integer(6),
Value::Null,
Value::Null,
],
]
);
@@ -288,7 +539,7 @@ fn test_cdc_custom_table() {
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')")
.unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
@@ -310,14 +561,18 @@ fn test_cdc_custom_table() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
]
);
@@ -331,7 +586,7 @@ fn test_cdc_ignore_changes_in_cdc_table() {
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')")
.unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
@@ -355,7 +610,9 @@ fn test_cdc_ignore_changes_in_cdc_table() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],]
);
}
@@ -371,7 +628,7 @@ fn test_cdc_transaction() {
.execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')")
.unwrap();
conn1.execute("BEGIN").unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
@@ -394,35 +651,45 @@ fn test_cdc_transaction() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("q".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(3)
Value::Integer(3),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(0),
Value::Text("q".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
]
);
@@ -434,10 +701,10 @@ fn test_cdc_independent_connections() {
let conn1 = db.connect_limbo();
let conn2 = db.connect_limbo();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc1')")
.unwrap();
conn2
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc2')")
.unwrap();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
@@ -461,7 +728,9 @@ fn test_cdc_independent_connections() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
]]
);
let rows =
@@ -473,7 +742,9 @@ fn test_cdc_independent_connections() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
]]
);
}
@@ -484,10 +755,10 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
let conn1 = db.connect_limbo();
let conn2 = db.connect_limbo();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc1')")
.unwrap();
conn2
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc2')")
.unwrap();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
@@ -522,14 +793,18 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(-1),
Value::Text("custom_cdc2".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
]
]
);
@@ -543,15 +818,64 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(4)
Value::Integer(4),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(-1),
Value::Text("custom_cdc1".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
]
]
);
}
#[test]
fn test_cdc_table_columns() {
let db = TempDatabase::new_empty(true);
let conn = db.connect_limbo();
conn.execute("CREATE TABLE t(a INTEGER PRIMARY KEY, b, c UNIQUE)")
.unwrap();
let rows = limbo_exec_rows(&db, &conn, "SELECT table_columns_json_array('t')");
assert_eq!(
rows,
vec![vec![Value::Text(r#"["a","b","c"]"#.to_string())]]
);
conn.execute("ALTER TABLE t DROP COLUMN b").unwrap();
let rows = limbo_exec_rows(&db, &conn, "SELECT table_columns_json_array('t')");
assert_eq!(rows, vec![vec![Value::Text(r#"["a","c"]"#.to_string())]]);
}
#[test]
fn test_cdc_bin_record() {
let db = TempDatabase::new_empty(true);
let conn = db.connect_limbo();
let record = record([
Value::Null,
Value::Integer(1),
// use golden ratio instead of pi because clippy has weird rule that I can't use PI approximation written by hand
Value::Real(1.61803),
Value::Text("hello".to_string()),
]);
let mut record_hex = String::new();
for byte in record {
record_hex.push_str(&format!("{:02X}", byte));
}
let rows = limbo_exec_rows(
&db,
&conn,
&format!(r#"SELECT bin_record_json_object('["a","b","c","d"]', X'{record_hex}')"#),
);
assert_eq!(
rows,
vec![vec![Value::Text(
r#"{"a":null,"b":1,"c":1.61803,"d":"hello"}"#.to_string()
)]]
);
}