From 3e5bfb0083006017d7dcaee4e8abea7c50244c28 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 11:34:10 +0400 Subject: [PATCH 01/20] copy comments about pragma flags from SQLite source code --- core/pragma.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/pragma.rs b/core/pragma.rs index a65aefff4..1a41ed5de 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -8,14 +8,14 @@ bitflags! { // Flag names match those used in SQLite: // https://github.com/sqlite/sqlite/blob/b3c1884b65400da85636458298bd77cbbfdfb401/tool/mkpragmatab.tcl#L22-L29 struct PragmaFlags: u8 { - const NeedSchema = 0x01; - const NoColumns = 0x02; - const NoColumns1 = 0x04; - const ReadOnly = 0x08; - const Result0 = 0x10; - const Result1 = 0x20; - const SchemaOpt = 0x40; - const SchemaReq = 0x80; + const NeedSchema = 0x01; /* Force schema load before running */ + const NoColumns = 0x02; /* OP_ResultRow called with zero columns */ + const NoColumns1 = 0x04; /* zero columns if RHS argument is present */ + const ReadOnly = 0x08; /* Read-only HEADER_VALUE */ + const Result0 = 0x10; /* Acts as query when no argument */ + const Result1 = 0x20; /* Acts as query when has one argument */ + const SchemaOpt = 0x40; /* Schema restricts name search if present */ + const SchemaReq = 0x80; /* Schema required - "main" is default */ } } From 7ba8ab6efc61c4722dc03b939bd959fc47c73a1d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 11:35:21 +0400 Subject: [PATCH 02/20] add simple method for parsing pragma boolean value --- core/util.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/core/util.rs b/core/util.rs index 1415bcfe7..515c05598 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1044,6 +1044,30 @@ pub fn parse_signed_number(expr: &Expr) -> Result { } } +pub fn parse_pragma_bool(expr: &Expr) -> Result { + const TRUE_VALUES: &[&str] = &["yes", "true", "on"]; + const FALSE_VALUES: &[&str] = &["no", "false", "off"]; + if let Ok(number) = parse_signed_number(expr) { + if let Value::Integer(x @ (0 | 1)) = number { + return Ok(x != 0); + } + } else { + if let Expr::Name(name) = expr { + let ident = normalize_ident(&name.0); + if TRUE_VALUES.contains(&ident.as_str()) { + return Ok(true); + } + if FALSE_VALUES.contains(&ident.as_str()) { + return Ok(false); + } + } + } + Err(LimboError::InvalidArgument( + "boolean pragma value must be either 0|1 integer or yes|true|on|no|false|off token" + .to_string(), + )) +} + // for TVF's we need these at planning time so we cannot emit translate_expr pub fn vtable_args(args: &[ast::Expr]) -> Vec { let mut vtable_args = Vec::new(); @@ -1076,7 +1100,7 @@ pub fn vtable_args(args: &[ast::Expr]) -> Vec { #[cfg(test)] pub mod tests { use super::*; - use turso_sqlite3_parser::ast::{self, Expr, Id, Literal, Operator::*, Type}; + use turso_sqlite3_parser::ast::{self, Expr, Id, Literal, Name, Operator::*, Type}; #[test] fn test_normalize_ident() { @@ -2031,4 +2055,44 @@ pub mod tests { Value::Float(-9.223_372_036_854_776e18) ); } + + #[test] + fn test_parse_pragma_bool() { + assert_eq!( + parse_pragma_bool(&Expr::Literal(Literal::Numeric("1".into()))).unwrap(), + true + ); + assert_eq!( + parse_pragma_bool(&Expr::Name(Name("true".into()))).unwrap(), + true + ); + assert_eq!( + parse_pragma_bool(&Expr::Name(Name("on".into()))).unwrap(), + true + ); + assert_eq!( + parse_pragma_bool(&Expr::Name(Name("yes".into()))).unwrap(), + true + ); + assert_eq!( + parse_pragma_bool(&Expr::Literal(Literal::Numeric("0".into()))).unwrap(), + false + ); + assert_eq!( + parse_pragma_bool(&Expr::Name(Name("false".into()))).unwrap(), + false + ); + assert_eq!( + parse_pragma_bool(&Expr::Name(Name("off".into()))).unwrap(), + false + ); + assert_eq!( + parse_pragma_bool(&Expr::Name(Name("no".into()))).unwrap(), + false + ); + + assert!(parse_pragma_bool(&Expr::Name(Name("nono".into()))).is_err()); + assert!(parse_pragma_bool(&Expr::Name(Name("10".into()))).is_err()); + assert!(parse_pragma_bool(&Expr::Name(Name("-1".into()))).is_err()); + } } From 3f0716b2a4e3d21aa48d626cb894eee07ef6d9d8 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 11:35:35 +0400 Subject: [PATCH 03/20] add capture_changes per-connection flag --- core/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/lib.rs b/core/lib.rs index 4067aac15..9fd1dddb9 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -278,6 +278,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), + capture_changes: Cell::new(false), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); @@ -330,6 +331,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), + capture_changes: Cell::new(false), }); if let Err(e) = conn.register_builtins() { @@ -450,6 +452,7 @@ pub struct Connection { cache_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, + capture_changes: Cell, } impl Connection { @@ -724,6 +727,13 @@ impl Connection { self.cache_size.set(size); } + pub fn get_capture_changes(&self) -> bool { + self.capture_changes.get() + } + pub fn set_capture_changes(&self, value: bool) { + self.capture_changes.set(value); + } + #[cfg(feature = "fs")] pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc, Arc)> { Database::open_with_vfs(&self._db, path, vfs) From b0fc67a31479ae55262b4807e92ab416bb0a3406 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 12:19:58 +0400 Subject: [PATCH 04/20] pass ownership or program to the pragma translators - just as with other statements --- core/translate/pragma.rs | 87 ++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 49 deletions(-) diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 93883a14b..374162b41 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -57,17 +57,15 @@ pub fn translate_pragma( Err(_) => bail_parse_error!("Not a valid pragma name"), }; - match body { - None => { - query_pragma(pragma, schema, None, pager, connection, &mut program)?; - } + let mut program = match body { + None => query_pragma(pragma, schema, None, pager, connection, program)?, Some(ast::PragmaBody::Equals(value) | ast::PragmaBody::Call(value)) => match pragma { PragmaName::TableInfo => { - query_pragma(pragma, schema, Some(value), pager, connection, &mut program)?; + query_pragma(pragma, schema, Some(value), pager, connection, program)? } _ => { write = true; - update_pragma(pragma, schema, value, pager, connection, &mut program)?; + update_pragma(pragma, schema, value, pager, connection, program)? } }, }; @@ -85,8 +83,8 @@ fn update_pragma( value: ast::Expr, pager: Rc, connection: Arc, - program: &mut ProgramBuilder, -) -> crate::Result<()> { + mut program: ProgramBuilder, +) -> crate::Result { match pragma { PragmaName::CacheSize => { let cache_size = match parse_signed_number(&value)? { @@ -95,42 +93,33 @@ fn update_pragma( _ => bail_parse_error!("Invalid value for cache size pragma"), }; update_cache_size(cache_size, pager, connection)?; - Ok(()) - } - PragmaName::JournalMode => { - query_pragma( - PragmaName::JournalMode, - schema, - None, - pager, - connection, - program, - )?; - Ok(()) - } - PragmaName::LegacyFileFormat => Ok(()), - PragmaName::WalCheckpoint => { - query_pragma( - PragmaName::WalCheckpoint, - schema, - Some(value), - pager, - connection, - program, - )?; - Ok(()) - } - PragmaName::PageCount => { - query_pragma( - PragmaName::PageCount, - schema, - None, - pager, - connection, - program, - )?; - Ok(()) + Ok(program) } + PragmaName::JournalMode => query_pragma( + PragmaName::JournalMode, + schema, + None, + pager, + connection, + program, + ), + PragmaName::LegacyFileFormat => Ok(program), + PragmaName::WalCheckpoint => query_pragma( + PragmaName::WalCheckpoint, + schema, + Some(value), + pager, + connection, + program, + ), + PragmaName::PageCount => query_pragma( + PragmaName::PageCount, + schema, + None, + pager, + connection, + program, + ), PragmaName::UserVersion => { let data = parse_signed_number(&value)?; let version_value = match data { @@ -145,7 +134,7 @@ fn update_pragma( value: version_value, p5: 1, }); - Ok(()) + Ok(program) } PragmaName::SchemaVersion => { // TODO: Implement updating schema_version @@ -214,7 +203,7 @@ fn update_pragma( value: auto_vacuum_mode - 1, p5: 0, }); - Ok(()) + Ok(program) } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), } @@ -226,8 +215,8 @@ fn query_pragma( value: Option, pager: Rc, connection: Arc, - program: &mut ProgramBuilder, -) -> crate::Result<()> { + mut program: ProgramBuilder, +) -> crate::Result { let register = program.alloc_register(); match pragma { PragmaName::CacheSize => { @@ -365,11 +354,11 @@ fn query_pragma( program.emit_result_row(register, 1); } PragmaName::IntegrityCheck => { - translate_integrity_check(schema, program)?; + translate_integrity_check(schema, &mut program)?; } } - Ok(()) + Ok(program) } fn update_auto_vacuum_mode( From 234dda322f218b540c988e09a619950b6e0fa4b5 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 12:23:58 +0400 Subject: [PATCH 05/20] handle change_capture pragma --- core/pragma.rs | 4 + core/translate/mod.rs | 8 +- core/translate/pragma.rs | 82 ++++++++++++++++++- vendored/sqlite3-parser/src/parser/ast/mod.rs | 19 +++++ 4 files changed, 110 insertions(+), 3 deletions(-) diff --git a/core/pragma.rs b/core/pragma.rs index 1a41ed5de..dca8bc169 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -77,6 +77,10 @@ fn pragma_for(pragma: PragmaName) -> Pragma { PragmaFlags::NeedSchema | PragmaFlags::ReadOnly | PragmaFlags::Result0, &["message"], ), + CaptureChanges => Pragma::new( + PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq, + &["capture_changes"], + ), } } diff --git a/core/translate/mod.rs b/core/translate/mod.rs index b7c82d585..62129d51f 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -37,7 +37,7 @@ mod values; use crate::schema::Schema; use crate::storage::pager::Pager; use crate::translate::delete::translate_delete; -use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode}; +use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderFlags, ProgramBuilderOpts, QueryMode}; use crate::vdbe::Program; use crate::{bail_parse_error, Connection, Result, SymbolTable}; use alter::translate_alter_table; @@ -73,8 +73,14 @@ pub fn translate( | ast::Stmt::Update(..) ); + let flags = if connection.get_capture_changes() { + ProgramBuilderFlags::CaptureChanges + } else { + ProgramBuilderFlags::empty() + }; let mut program = ProgramBuilder::new( query_mode, + flags, // These options will be extended whithin each translate program ProgramBuilderOpts { num_cursors: 1, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 374162b41..2d563c3c5 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -3,14 +3,15 @@ use std::rc::Rc; use std::sync::Arc; -use turso_sqlite3_parser::ast::PragmaName; use turso_sqlite3_parser::ast::{self, Expr}; +use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; use crate::storage::wal::CheckpointMode; -use crate::util::{normalize_ident, parse_signed_number}; +use crate::translate::schema::translate_create_table; +use crate::util::{normalize_ident, parse_pragma_bool, parse_signed_number}; use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts}; use crate::vdbe::insn::{Cookie, Insn}; use crate::{bail_parse_error, storage, LimboError, Value}; @@ -206,6 +207,78 @@ fn update_pragma( Ok(program) } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), + PragmaName::CaptureChanges => { + let value = parse_pragma_bool(&value)?; + // todo(sivukhin): ideally, we should consistently update capture_changes connection flag only after successfull execution of schema change statement + // but for now, let's keep it as is... + connection.set_capture_changes(value); + if value { + // make sure that we have turso_cdc table created + let columns = vec![ + ast::ColumnDefinition { + col_name: ast::Name("operation_id".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![ast::NamedColumnConstraint { + name: None, + constraint: ast::ColumnConstraint::PrimaryKey { + order: None, + conflict_clause: None, + auto_increment: false, + }, + }], + }, + ast::ColumnDefinition { + col_name: ast::Name("operation_time".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("operation_type".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("table_name".to_string()), + col_type: Some(ast::Type { + name: "TEXT".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("row_key".to_string()), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, + ]; + return translate_create_table( + QualifiedName::single(ast::Name("turso_cdc".into())), + false, + ast::CreateTableBody::columns_and_constraints_from_definition( + columns, + None, + ast::TableOptions::NONE, + ) + .unwrap(), + true, + schema, + program, + ); + } + Ok(program) + } } } @@ -356,6 +429,11 @@ fn query_pragma( PragmaName::IntegrityCheck => { translate_integrity_check(schema, &mut program)?; } + PragmaName::CaptureChanges => { + program.emit_bool(connection.get_capture_changes(), register); + program.emit_result_row(register, 1); + program.add_pragma_result_column(pragma.to_string()); + } } Ok(program) diff --git a/vendored/sqlite3-parser/src/parser/ast/mod.rs b/vendored/sqlite3-parser/src/parser/ast/mod.rs index 956fa88bd..619689fb0 100644 --- a/vendored/sqlite3-parser/src/parser/ast/mod.rs +++ b/vendored/sqlite3-parser/src/parser/ast/mod.rs @@ -1362,6 +1362,23 @@ impl CreateTableBody { options, }) } + + /// Constructor from Vec of column definition + pub fn columns_and_constraints_from_definition( + columns_vec: Vec, + constraints: Option>, + options: TableOptions, + ) -> Result { + let mut columns = IndexMap::new(); + for def in columns_vec { + columns.insert(def.col_name.clone(), def); + } + Ok(Self::ColumnsAndConstraints { + columns, + constraints, + options, + }) + } } /// Table column definition @@ -1748,6 +1765,8 @@ pub enum PragmaName { UserVersion, /// trigger a checkpoint to run on database(s) if WAL is enabled WalCheckpoint, + /// enable capture-changes logic for the connection + CaptureChanges, } /// `CREATE TRIGGER` time From cf7ae031c79683eb183beaa24b1d5dae5be09509 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 14:19:02 +0400 Subject: [PATCH 06/20] add ProgramBuilderFlags to the builder --- core/vdbe/builder.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 7d45c5f4b..4a32c9dad 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -1,5 +1,6 @@ use std::{cell::Cell, cmp::Ordering, rc::Rc, sync::Arc}; +use bitflags::bitflags; use tracing::{instrument, Level}; use turso_sqlite3_parser::ast::{self, TableInternalId}; @@ -110,6 +111,7 @@ pub struct ProgramBuilder { nested_level: usize, init_label: BranchOffset, start_offset: BranchOffset, + flags: ProgramBuilderFlags, } #[derive(Debug, Clone)] @@ -133,6 +135,12 @@ pub enum QueryMode { Explain, } +bitflags! { + pub struct ProgramBuilderFlags: u8 { + const CaptureChanges = 0x01; /* emit plans with capture changes instructurs for INSERT/DELETE/UPDATE operations */ + } +} + impl From for QueryMode { fn from(stmt: ast::Cmd) -> Self { match stmt { @@ -149,7 +157,11 @@ pub struct ProgramBuilderOpts { } impl ProgramBuilder { - pub fn new(query_mode: QueryMode, opts: ProgramBuilderOpts) -> Self { + pub fn new( + query_mode: QueryMode, + flags: ProgramBuilderFlags, + opts: ProgramBuilderOpts, + ) -> Self { Self { table_reference_counter: TableRefIdCounter::new(), next_free_register: 1, @@ -172,9 +184,14 @@ impl ProgramBuilder { // These labels will be filled when `prologue()` is called init_label: BranchOffset::Placeholder, start_offset: BranchOffset::Placeholder, + flags, } } + pub fn flags(&self) -> &ProgramBuilderFlags { + &self.flags + } + pub fn extend(&mut self, opts: &ProgramBuilderOpts) { self.insns.reserve(opts.approx_num_insns); self.cursor_ref.reserve(opts.num_cursors); From d72ba9877a6b67b5d238b985515ada81ed542c92 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 14:19:18 +0400 Subject: [PATCH 07/20] emit turso_cdc table changes in Insert query plan --- core/translate/insert.rs | 93 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 872b891c7..71be2164b 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -7,9 +7,10 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; use crate::util::normalize_ident; -use crate::vdbe::builder::ProgramBuilderOpts; +use crate::vdbe::builder::{ProgramBuilderFlags, ProgramBuilderOpts}; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; use crate::vdbe::BranchOffset; +use crate::{bail_parse_error, Result, SymbolTable, VirtualTable}; use crate::{ schema::{Column, Schema}, vdbe::{ @@ -17,7 +18,6 @@ use crate::{ insn::Insn, }, }; -use crate::{Result, SymbolTable, VirtualTable}; use super::emitter::Resolver; use super::expr::{translate_expr, translate_expr_no_constant_opt, NoConstantOptReason}; @@ -116,6 +116,24 @@ pub fn translate_insert( let halt_label = program.allocate_label(); let loop_start_label = program.allocate_label(); + let capture_changes = program + .flags() + .contains(ProgramBuilderFlags::CaptureChanges); + let turso_cdc_table = if capture_changes { + let Some(turso_cdc_table) = schema.get_table("turso_cdc") else { + crate::bail_parse_error!("no such table: {}", "turso_cdc"); + }; + let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", "turso_cdc"); + }; + Some(( + program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())), + turso_cdc_btree, + )) + } else { + None + }; + let mut yield_reg_opt = None; let mut temp_table_ctx = None; let (num_values, cursor_id) = match body { @@ -328,6 +346,15 @@ pub fn translate_insert( &resolver, )?; } + // Open turso_cdc table btree for writing if necessary + if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table { + program.emit_insn(Insn::OpenWrite { + cursor_id: *turso_cdc_cursor_id, + root_page: RegisterOrLiteral::Literal(turso_cdc_btree.root_page), + name: turso_cdc_btree.name.clone(), + }); + } + // Open all the index btrees for writing for idx_cursor in idx_cursors.iter() { program.emit_insn(Insn::OpenWrite { @@ -414,6 +441,68 @@ pub fn translate_insert( _ => (), } + // Write record to the turso_cdc table if necessary + if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table { + // (operation_id INTEGER PRIMARY KEY, operation_time INTEGER, operation_type INTEGER, table_name TEXT, row_key BLOB) + let turso_cdc_registers = program.alloc_registers(5); + program.emit_insn(Insn::Null { + dest: turso_cdc_registers, + dest_end: None, + }); + program.mark_last_insn_constant(); + + let Some(unixepoch_fn) = resolver.resolve_function("unixepoch", 0) else { + bail_parse_error!("no function {}", "unixepoch"); + }; + let unixepoch_fn_ctx = crate::function::FuncCtx { + func: unixepoch_fn, + arg_count: 0, + }; + + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg: 0, + dest: turso_cdc_registers + 1, + func: unixepoch_fn_ctx, + }); + + program.emit_int(1, turso_cdc_registers + 2); + program.mark_last_insn_constant(); + + program.emit_string8(table_name.0.clone(), turso_cdc_registers + 3); + program.mark_last_insn_constant(); + + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: turso_cdc_registers + 4, + amount: 0, + }); + + let rowid_reg = program.alloc_register(); + // todo(sivukhin): we **must** guarantee sequential generation or operation_id column + program.emit_insn(Insn::NewRowid { + cursor: *turso_cdc_cursor_id, + rowid_reg, + prev_largest_reg: 0, + }); + + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: turso_cdc_registers, + count: 5, + dest_reg: record_reg, + index_name: None, + }); + + program.emit_insn(Insn::Insert { + cursor: *turso_cdc_cursor_id, + key_reg: rowid_reg, + record_reg, + flag: InsertFlags::new(), + table_name: turso_cdc_btree.name.clone(), + }); + } + let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?; for index_col_mapping in index_col_mappings { // find which cursor we opened earlier for this index From a82529f55a62049322943880b53ed23bf3e30c68 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 15:02:18 +0400 Subject: [PATCH 08/20] emit cdc changes for UPDATE / DELETE statements --- core/translate/emitter.rs | 142 +++++++++++++++++++++++++++++++++++- core/translate/insert.rs | 71 +++--------------- core/translate/main_loop.rs | 28 ++++++- core/translate/subquery.rs | 1 + 4 files changed, 180 insertions(+), 62 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 17540ac3a..e3a649a22 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -31,7 +31,7 @@ use crate::vdbe::builder::{CursorKey, CursorType, ProgramBuilder}; use crate::vdbe::insn::{CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral}; use crate::vdbe::CursorID; use crate::vdbe::{insn::Insn, BranchOffset}; -use crate::{Result, SymbolTable}; +use crate::{bail_parse_error, Result, SymbolTable}; pub struct Resolver<'a> { pub schema: &'a Schema, @@ -149,6 +149,8 @@ pub struct TranslateCtx<'a> { /// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause. /// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`. pub non_aggregate_expressions: Vec<(&'a Expr, bool)>, + /// Cursor id for turso_cdc table (if capture_changes=on is set and query can modify the data) + pub cdc_cursor_id: Option, } impl<'a> TranslateCtx<'a> { @@ -175,6 +177,7 @@ impl<'a> TranslateCtx<'a> { result_columns_to_skip_in_orderby_sorter: None, resolver: Resolver::new(schema, syms), non_aggregate_expressions: Vec::new(), + cdc_cursor_id: None, } } } @@ -566,6 +569,22 @@ fn emit_delete_insns( } } + if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id { + let rowid_reg = program.alloc_register(); + program.emit_insn(Insn::RowId { + cursor_id: main_table_cursor_id, + dest: rowid_reg, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::DELETE, + turso_cdc_cursor_id, + rowid_reg, + &table_reference.identifier, // is it OK to use identifier here? + )?; + } + program.emit_insn(Insn::Delete { cursor_id: main_table_cursor_id, }); @@ -1076,6 +1095,53 @@ fn emit_update_insns( }); } + if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { + let rowid_reg = program.alloc_register(); + if has_user_provided_rowid { + program.emit_insn(Insn::RowId { + cursor_id: cursor_id, + dest: rowid_reg, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::DELETE, + cdc_cursor_id, + rowid_reg, + &table_ref.identifier, // is it OK to use identifier here? + )?; + program.emit_insn(Insn::Copy { + src_reg: rowid_set_clause_reg.expect( + "rowid_set_clause_reg must be set because has_user_provided_rowid is true", + ), + dst_reg: rowid_reg, + amount: 1, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::INSERT, + cdc_cursor_id, + rowid_reg, + &table_ref.identifier, // is it OK to use identifier here? + )?; + } else { + program.emit_insn(Insn::Copy { + src_reg: rowid_set_clause_reg.unwrap_or(beg), + dst_reg: rowid_reg, + amount: 1, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::UPDATE, + cdc_cursor_id, + rowid_reg, + &table_ref.identifier, // is it OK to use identifier here? + )?; + } + } + // If we are updating the rowid, we cannot rely on overwrite on the // Insert instruction to update the cell. We need to first delete the current cell // and later insert the updated record @@ -1115,6 +1181,80 @@ fn emit_update_insns( Ok(()) } +pub fn emit_cdc_insns( + program: &mut ProgramBuilder, + resolver: &Resolver, + operation_mode: OperationMode, + cdc_cursor_id: usize, + rowid_reg: usize, + table_name: &str, +) -> Result<()> { + // (operation_id INTEGER PRIMARY KEY, operation_time INTEGER, operation_type INTEGER, table_name TEXT, row_key BLOB) + let turso_cdc_registers = program.alloc_registers(5); + program.emit_insn(Insn::Null { + dest: turso_cdc_registers, + dest_end: None, + }); + program.mark_last_insn_constant(); + + let Some(unixepoch_fn) = resolver.resolve_function("unixepoch", 0) else { + bail_parse_error!("no function {}", "unixepoch"); + }; + let unixepoch_fn_ctx = crate::function::FuncCtx { + func: unixepoch_fn, + arg_count: 0, + }; + + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg: 0, + dest: turso_cdc_registers + 1, + func: unixepoch_fn_ctx, + }); + + let operation_type = match operation_mode { + OperationMode::INSERT => 1, + OperationMode::UPDATE | OperationMode::SELECT => 0, + OperationMode::DELETE => -1, + }; + program.emit_int(operation_type, turso_cdc_registers + 2); + program.mark_last_insn_constant(); + + program.emit_string8(table_name.to_string(), turso_cdc_registers + 3); + program.mark_last_insn_constant(); + + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: turso_cdc_registers + 4, + amount: 0, + }); + + let rowid_reg = program.alloc_register(); + // todo(sivukhin): we **must** guarantee sequential generation or operation_id column + program.emit_insn(Insn::NewRowid { + cursor: cdc_cursor_id, + rowid_reg, + prev_largest_reg: 0, + }); + + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: turso_cdc_registers, + count: 5, + dest_reg: record_reg, + index_name: None, + }); + + program.emit_insn(Insn::Insert { + cursor: cdc_cursor_id, + key_reg: rowid_reg, + record_reg, + flag: InsertFlags::new(), + table_name: "turso_cdc".to_string(), + }); + Ok(()) +} + /// Initialize the limit/offset counters and registers. /// In case of compound SELECTs, the limit counter is initialized only once, /// hence [LimitCtx::initialize_counter] being false in those cases. diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 71be2164b..dffa513b7 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -6,11 +6,11 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; +use crate::translate::emitter::{emit_cdc_insns, OperationMode}; use crate::util::normalize_ident; use crate::vdbe::builder::{ProgramBuilderFlags, ProgramBuilderOpts}; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; use crate::vdbe::BranchOffset; -use crate::{bail_parse_error, Result, SymbolTable, VirtualTable}; use crate::{ schema::{Column, Schema}, vdbe::{ @@ -18,6 +18,7 @@ use crate::{ insn::Insn, }, }; +use crate::{Result, SymbolTable, VirtualTable}; use super::emitter::Resolver; use super::expr::{translate_expr, translate_expr_no_constant_opt, NoConstantOptReason}; @@ -350,7 +351,7 @@ pub fn translate_insert( if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table { program.emit_insn(Insn::OpenWrite { cursor_id: *turso_cdc_cursor_id, - root_page: RegisterOrLiteral::Literal(turso_cdc_btree.root_page), + root_page: turso_cdc_btree.root_page.into(), name: turso_cdc_btree.name.clone(), }); } @@ -442,65 +443,15 @@ pub fn translate_insert( } // Write record to the turso_cdc table if necessary - if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table { - // (operation_id INTEGER PRIMARY KEY, operation_time INTEGER, operation_type INTEGER, table_name TEXT, row_key BLOB) - let turso_cdc_registers = program.alloc_registers(5); - program.emit_insn(Insn::Null { - dest: turso_cdc_registers, - dest_end: None, - }); - program.mark_last_insn_constant(); - - let Some(unixepoch_fn) = resolver.resolve_function("unixepoch", 0) else { - bail_parse_error!("no function {}", "unixepoch"); - }; - let unixepoch_fn_ctx = crate::function::FuncCtx { - func: unixepoch_fn, - arg_count: 0, - }; - - program.emit_insn(Insn::Function { - constant_mask: 0, - start_reg: 0, - dest: turso_cdc_registers + 1, - func: unixepoch_fn_ctx, - }); - - program.emit_int(1, turso_cdc_registers + 2); - program.mark_last_insn_constant(); - - program.emit_string8(table_name.0.clone(), turso_cdc_registers + 3); - program.mark_last_insn_constant(); - - program.emit_insn(Insn::Copy { - src_reg: rowid_reg, - dst_reg: turso_cdc_registers + 4, - amount: 0, - }); - - let rowid_reg = program.alloc_register(); - // todo(sivukhin): we **must** guarantee sequential generation or operation_id column - program.emit_insn(Insn::NewRowid { - cursor: *turso_cdc_cursor_id, + if let Some((turso_cdc_cursor_id, _)) = &turso_cdc_table { + emit_cdc_insns( + &mut program, + &resolver, + OperationMode::INSERT, + *turso_cdc_cursor_id, rowid_reg, - prev_largest_reg: 0, - }); - - let record_reg = program.alloc_register(); - program.emit_insn(Insn::MakeRecord { - start_reg: turso_cdc_registers, - count: 5, - dest_reg: record_reg, - index_name: None, - }); - - program.emit_insn(Insn::Insert { - cursor: *turso_cdc_cursor_id, - key_reg: rowid_reg, - record_reg, - flag: InsertFlags::new(), - table_name: turso_cdc_btree.name.clone(), - }); + &table_name.0, + )?; } let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?; diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index 88c18c054..ef1280d75 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -11,7 +11,7 @@ use crate::{ }, types::SeekOp, vdbe::{ - builder::{CursorKey, CursorType, ProgramBuilder}, + builder::{CursorKey, CursorType, ProgramBuilder, ProgramBuilderFlags}, insn::{CmpInsFlags, IdxInsertFlags, Insn}, BranchOffset, CursorID, }, @@ -117,6 +117,32 @@ pub fn init_loop( t_ctx.meta_left_joins.len() == tables.joined_tables().len(), "meta_left_joins length does not match tables length" ); + + let capture_changes = program + .flags() + .contains(ProgramBuilderFlags::CaptureChanges); + if capture_changes + && matches!( + mode, + OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE + ) + { + let Some(turso_cdc_table) = t_ctx.resolver.schema.get_table("turso_cdc") else { + crate::bail_parse_error!("no such table: {}", "turso_cdc"); + }; + let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", "turso_cdc"); + }; + let turso_cdc_cursor_id = + program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())); + program.emit_insn(Insn::OpenWrite { + cursor_id: turso_cdc_cursor_id, + root_page: turso_cdc_btree.root_page.into(), + name: turso_cdc_btree.name.clone(), + }); + t_ctx.cdc_cursor_id = Some(turso_cdc_cursor_id); + } + // Initialize ephemeral indexes for distinct aggregates for (i, agg) in aggregates .iter_mut() diff --git a/core/translate/subquery.rs b/core/translate/subquery.rs index 4004a7cd1..645c95f6e 100644 --- a/core/translate/subquery.rs +++ b/core/translate/subquery.rs @@ -82,6 +82,7 @@ pub fn emit_subquery( reg_limit_offset_sum: None, resolver: Resolver::new(t_ctx.resolver.schema, t_ctx.resolver.symbol_table), non_aggregate_expressions: Vec::new(), + cdc_cursor_id: None, }; let subquery_body_end_label = program.allocate_label(); program.emit_insn(Insn::InitCoroutine { From 04f2efeaa4117c32c13640df7ee15d6cea787b73 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 15:04:31 +0400 Subject: [PATCH 09/20] small renames --- core/lib.rs | 14 +++++++------- core/pragma.rs | 4 ++-- core/translate/emitter.rs | 2 +- core/translate/insert.rs | 6 +++--- core/translate/main_loop.rs | 6 +++--- core/translate/mod.rs | 4 ++-- core/translate/pragma.rs | 10 +++++----- core/vdbe/builder.rs | 2 +- vendored/sqlite3-parser/src/parser/ast/mod.rs | 2 +- 9 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 9fd1dddb9..0bd2fa435 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -278,7 +278,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), - capture_changes: Cell::new(false), + capture_data_changes: Cell::new(false), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); @@ -331,7 +331,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), - capture_changes: Cell::new(false), + capture_data_changes: Cell::new(false), }); if let Err(e) = conn.register_builtins() { @@ -452,7 +452,7 @@ pub struct Connection { cache_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, - capture_changes: Cell, + capture_data_changes: Cell, } impl Connection { @@ -727,11 +727,11 @@ impl Connection { self.cache_size.set(size); } - pub fn get_capture_changes(&self) -> bool { - self.capture_changes.get() + pub fn get_capture_data_changes(&self) -> bool { + self.capture_data_changes.get() } - pub fn set_capture_changes(&self, value: bool) { - self.capture_changes.set(value); + pub fn set_capture_data_changes(&self, value: bool) { + self.capture_data_changes.set(value); } #[cfg(feature = "fs")] diff --git a/core/pragma.rs b/core/pragma.rs index dca8bc169..6efbc8b62 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -77,9 +77,9 @@ fn pragma_for(pragma: PragmaName) -> Pragma { PragmaFlags::NeedSchema | PragmaFlags::ReadOnly | PragmaFlags::Result0, &["message"], ), - CaptureChanges => Pragma::new( + CaptureDataChanges => Pragma::new( PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq, - &["capture_changes"], + &["capture_data_changes"], ), } } diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index e3a649a22..4b095bde8 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -149,7 +149,7 @@ pub struct TranslateCtx<'a> { /// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause. /// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`. pub non_aggregate_expressions: Vec<(&'a Expr, bool)>, - /// Cursor id for turso_cdc table (if capture_changes=on is set and query can modify the data) + /// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data) pub cdc_cursor_id: Option, } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index dffa513b7..04e8401a9 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -117,10 +117,10 @@ pub fn translate_insert( let halt_label = program.allocate_label(); let loop_start_label = program.allocate_label(); - let capture_changes = program + let capture_data_changes = program .flags() - .contains(ProgramBuilderFlags::CaptureChanges); - let turso_cdc_table = if capture_changes { + .contains(ProgramBuilderFlags::CaptureDataChanges); + let turso_cdc_table = if capture_data_changes { let Some(turso_cdc_table) = schema.get_table("turso_cdc") else { crate::bail_parse_error!("no such table: {}", "turso_cdc"); }; diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index ef1280d75..ba9930f8c 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -118,10 +118,10 @@ pub fn init_loop( "meta_left_joins length does not match tables length" ); - let capture_changes = program + let capture_data_changes = program .flags() - .contains(ProgramBuilderFlags::CaptureChanges); - if capture_changes + .contains(ProgramBuilderFlags::CaptureDataChanges); + if capture_data_changes && matches!( mode, OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE diff --git a/core/translate/mod.rs b/core/translate/mod.rs index 62129d51f..a2953b2b2 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -73,8 +73,8 @@ pub fn translate( | ast::Stmt::Update(..) ); - let flags = if connection.get_capture_changes() { - ProgramBuilderFlags::CaptureChanges + let flags = if connection.get_capture_data_changes() { + ProgramBuilderFlags::CaptureDataChanges } else { ProgramBuilderFlags::empty() }; diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 2d563c3c5..92faab95e 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -207,11 +207,11 @@ fn update_pragma( Ok(program) } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), - PragmaName::CaptureChanges => { + PragmaName::CaptureDataChanges => { let value = parse_pragma_bool(&value)?; - // todo(sivukhin): ideally, we should consistently update capture_changes connection flag only after successfull execution of schema change statement + // todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement // but for now, let's keep it as is... - connection.set_capture_changes(value); + connection.set_capture_data_changes(value); if value { // make sure that we have turso_cdc table created let columns = vec![ @@ -429,8 +429,8 @@ fn query_pragma( PragmaName::IntegrityCheck => { translate_integrity_check(schema, &mut program)?; } - PragmaName::CaptureChanges => { - program.emit_bool(connection.get_capture_changes(), register); + PragmaName::CaptureDataChanges => { + program.emit_bool(connection.get_capture_data_changes(), register); program.emit_result_row(register, 1); program.add_pragma_result_column(pragma.to_string()); } diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 4a32c9dad..795b88a8b 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -137,7 +137,7 @@ pub enum QueryMode { bitflags! { pub struct ProgramBuilderFlags: u8 { - const CaptureChanges = 0x01; /* emit plans with capture changes instructurs for INSERT/DELETE/UPDATE operations */ + const CaptureDataChanges = 0x01; /* emit plans with capture data changes instructions for INSERT/DELETE/UPDATE statements */ } } diff --git a/vendored/sqlite3-parser/src/parser/ast/mod.rs b/vendored/sqlite3-parser/src/parser/ast/mod.rs index 619689fb0..3dc8fb818 100644 --- a/vendored/sqlite3-parser/src/parser/ast/mod.rs +++ b/vendored/sqlite3-parser/src/parser/ast/mod.rs @@ -1766,7 +1766,7 @@ pub enum PragmaName { /// trigger a checkpoint to run on database(s) if WAL is enabled WalCheckpoint, /// enable capture-changes logic for the connection - CaptureChanges, + CaptureDataChanges, } /// `CREATE TRIGGER` time From 40769618c13bec3686a01ece7579f1aad0b9dbd7 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 15:10:14 +0400 Subject: [PATCH 10/20] small refactoring --- core/translate/emitter.rs | 3 +- core/translate/insert.rs | 7 ++- core/translate/main_loop.rs | 7 ++- core/translate/pragma.rs | 108 +++++++++++++++++++----------------- 4 files changed, 66 insertions(+), 59 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 4b095bde8..a083a5677 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -25,6 +25,7 @@ use crate::function::Func; use crate::schema::Schema; use crate::translate::compound_select::emit_program_for_compound_select; use crate::translate::plan::{DeletePlan, Plan, QueryDestination, Search}; +use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::translate::values::emit_values; use crate::util::exprs_are_equivalent; use crate::vdbe::builder::{CursorKey, CursorType, ProgramBuilder}; @@ -1250,7 +1251,7 @@ pub fn emit_cdc_insns( key_reg: rowid_reg, record_reg, flag: InsertFlags::new(), - table_name: "turso_cdc".to_string(), + table_name: TURSO_CDC_TABLE_NAME.to_string(), }); Ok(()) } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 04e8401a9..05684147b 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -7,6 +7,7 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; use crate::translate::emitter::{emit_cdc_insns, OperationMode}; +use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::util::normalize_ident; use crate::vdbe::builder::{ProgramBuilderFlags, ProgramBuilderOpts}; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; @@ -121,11 +122,11 @@ pub fn translate_insert( .flags() .contains(ProgramBuilderFlags::CaptureDataChanges); let turso_cdc_table = if capture_data_changes { - let Some(turso_cdc_table) = schema.get_table("turso_cdc") else { - crate::bail_parse_error!("no such table: {}", "turso_cdc"); + let Some(turso_cdc_table) = schema.get_table(TURSO_CDC_TABLE_NAME) else { + crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); }; let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", "turso_cdc"); + crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); }; Some(( program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())), diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index ba9930f8c..d26824763 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -7,6 +7,7 @@ use crate::{ schema::{Affinity, Index, IndexColumn, Table}, translate::{ plan::{DistinctCtx, Distinctness}, + pragma::TURSO_CDC_TABLE_NAME, result_row::emit_select_result, }, types::SeekOp, @@ -127,11 +128,11 @@ pub fn init_loop( OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE ) { - let Some(turso_cdc_table) = t_ctx.resolver.schema.get_table("turso_cdc") else { - crate::bail_parse_error!("no such table: {}", "turso_cdc"); + let Some(turso_cdc_table) = t_ctx.resolver.schema.get_table(TURSO_CDC_TABLE_NAME) else { + crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); }; let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", "turso_cdc"); + crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); }; let turso_cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())); diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 92faab95e..8d486cc44 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::sync::Arc; -use turso_sqlite3_parser::ast::{self, Expr}; +use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr}; use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; use crate::schema::Schema; @@ -214,60 +214,11 @@ fn update_pragma( connection.set_capture_data_changes(value); if value { // make sure that we have turso_cdc table created - let columns = vec![ - ast::ColumnDefinition { - col_name: ast::Name("operation_id".to_string()), - col_type: Some(ast::Type { - name: "INTEGER".to_string(), - size: None, - }), - constraints: vec![ast::NamedColumnConstraint { - name: None, - constraint: ast::ColumnConstraint::PrimaryKey { - order: None, - conflict_clause: None, - auto_increment: false, - }, - }], - }, - ast::ColumnDefinition { - col_name: ast::Name("operation_time".to_string()), - col_type: Some(ast::Type { - name: "INTEGER".to_string(), - size: None, - }), - constraints: vec![], - }, - ast::ColumnDefinition { - col_name: ast::Name("operation_type".to_string()), - col_type: Some(ast::Type { - name: "INTEGER".to_string(), - size: None, - }), - constraints: vec![], - }, - ast::ColumnDefinition { - col_name: ast::Name("table_name".to_string()), - col_type: Some(ast::Type { - name: "TEXT".to_string(), - size: None, - }), - constraints: vec![], - }, - ast::ColumnDefinition { - col_name: ast::Name("row_key".to_string()), - col_type: Some(ast::Type { - name: "BLOB".to_string(), - size: None, - }), - constraints: vec![], - }, - ]; return translate_create_table( - QualifiedName::single(ast::Name("turso_cdc".into())), + QualifiedName::single(ast::Name(TURSO_CDC_TABLE_NAME.into())), false, ast::CreateTableBody::columns_and_constraints_from_definition( - columns, + turso_cdc_table_columns(), None, ast::TableOptions::NONE, ) @@ -502,3 +453,56 @@ fn update_cache_size( Ok(()) } + +pub const TURSO_CDC_TABLE_NAME: &str = "turso_cdc"; +fn turso_cdc_table_columns() -> Vec { + vec![ + ast::ColumnDefinition { + col_name: ast::Name("operation_id".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![ast::NamedColumnConstraint { + name: None, + constraint: ast::ColumnConstraint::PrimaryKey { + order: None, + conflict_clause: None, + auto_increment: false, + }, + }], + }, + ast::ColumnDefinition { + col_name: ast::Name("operation_time".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("operation_type".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("table_name".to_string()), + col_type: Some(ast::Type { + name: "TEXT".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("row_key".to_string()), + col_type: Some(ast::Type { + name: "BLOB".to_string(), + size: None, + }), + constraints: vec![], + }, + ] +} From 271b8e5bcdba08c2bb084b23bcf875738d6ad276 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 15:16:40 +0400 Subject: [PATCH 11/20] fix clippy --- core/translate/emitter.rs | 2 +- core/util.rs | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index a083a5677..47f4ae0b4 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -1100,7 +1100,7 @@ fn emit_update_insns( let rowid_reg = program.alloc_register(); if has_user_provided_rowid { program.emit_insn(Insn::RowId { - cursor_id: cursor_id, + cursor_id, dest: rowid_reg, }); emit_cdc_insns( diff --git a/core/util.rs b/core/util.rs index 515c05598..52cd36bb1 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1051,15 +1051,13 @@ pub fn parse_pragma_bool(expr: &Expr) -> Result { if let Value::Integer(x @ (0 | 1)) = number { return Ok(x != 0); } - } else { - if let Expr::Name(name) = expr { - let ident = normalize_ident(&name.0); - if TRUE_VALUES.contains(&ident.as_str()) { - return Ok(true); - } - if FALSE_VALUES.contains(&ident.as_str()) { - return Ok(false); - } + } else if let Expr::Name(name) = expr { + let ident = normalize_ident(&name.0); + if TRUE_VALUES.contains(&ident.as_str()) { + return Ok(true); + } + if FALSE_VALUES.contains(&ident.as_str()) { + return Ok(false); } } Err(LimboError::InvalidArgument( From 6a6276878c46b34ac1ad80cc82ca856fda45838c Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 15:24:16 +0400 Subject: [PATCH 12/20] fix test --- vendored/sqlite3-parser/src/parser/ast/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vendored/sqlite3-parser/src/parser/ast/mod.rs b/vendored/sqlite3-parser/src/parser/ast/mod.rs index 3dc8fb818..25fdb6897 100644 --- a/vendored/sqlite3-parser/src/parser/ast/mod.rs +++ b/vendored/sqlite3-parser/src/parser/ast/mod.rs @@ -1747,6 +1747,8 @@ pub enum PragmaName { AutoVacuum, /// `cache_size` pragma CacheSize, + /// enable capture-changes logic for the connection + CaptureDataChanges, /// Run integrity check on the database file IntegrityCheck, /// `journal_mode` pragma @@ -1765,8 +1767,6 @@ pub enum PragmaName { UserVersion, /// trigger a checkpoint to run on database(s) if WAL is enabled WalCheckpoint, - /// enable capture-changes logic for the connection - CaptureDataChanges, } /// `CREATE TRIGGER` time From a3732939bd26c51accf5c17462f7248ea80d303e Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 2 Jul 2025 15:30:53 +0400 Subject: [PATCH 13/20] fix clippy again --- core/util.rs | 41 +++++++++-------------------------------- 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/core/util.rs b/core/util.rs index 52cd36bb1..87a1ac0fb 100644 --- a/core/util.rs +++ b/core/util.rs @@ -2056,38 +2056,15 @@ pub mod tests { #[test] fn test_parse_pragma_bool() { - assert_eq!( - parse_pragma_bool(&Expr::Literal(Literal::Numeric("1".into()))).unwrap(), - true - ); - assert_eq!( - parse_pragma_bool(&Expr::Name(Name("true".into()))).unwrap(), - true - ); - assert_eq!( - parse_pragma_bool(&Expr::Name(Name("on".into()))).unwrap(), - true - ); - assert_eq!( - parse_pragma_bool(&Expr::Name(Name("yes".into()))).unwrap(), - true - ); - assert_eq!( - parse_pragma_bool(&Expr::Literal(Literal::Numeric("0".into()))).unwrap(), - false - ); - assert_eq!( - parse_pragma_bool(&Expr::Name(Name("false".into()))).unwrap(), - false - ); - assert_eq!( - parse_pragma_bool(&Expr::Name(Name("off".into()))).unwrap(), - false - ); - assert_eq!( - parse_pragma_bool(&Expr::Name(Name("no".into()))).unwrap(), - false - ); + assert!(parse_pragma_bool(&Expr::Literal(Literal::Numeric("1".into()))).unwrap(),); + assert!(parse_pragma_bool(&Expr::Name(Name("true".into()))).unwrap(),); + assert!(parse_pragma_bool(&Expr::Name(Name("on".into()))).unwrap(),); + assert!(parse_pragma_bool(&Expr::Name(Name("yes".into()))).unwrap(),); + + assert!(!parse_pragma_bool(&Expr::Literal(Literal::Numeric("0".into()))).unwrap(),); + assert!(!parse_pragma_bool(&Expr::Name(Name("false".into()))).unwrap(),); + assert!(!parse_pragma_bool(&Expr::Name(Name("off".into()))).unwrap(),); + assert!(!parse_pragma_bool(&Expr::Name(Name("no".into()))).unwrap(),); assert!(parse_pragma_bool(&Expr::Name(Name("nono".into()))).is_err()); assert!(parse_pragma_bool(&Expr::Name(Name("10".into()))).is_err()); From a988bbaffeef0240843c23e0ac2e9bae682923ba Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sun, 6 Jul 2025 22:19:32 +0400 Subject: [PATCH 14/20] allow to specify table in the capture_data_changes PRAGMA --- core/lib.rs | 48 +++++++++++++++++++++++++++++++------ core/pragma.rs | 12 +++++----- core/translate/insert.rs | 33 ++++++++++++------------- core/translate/main_loop.rs | 29 ++++++++++------------ core/translate/mod.rs | 9 ++----- core/translate/pragma.rs | 35 +++++++++++++++++---------- core/util.rs | 13 ++++++++++ core/vdbe/builder.rs | 19 +++++---------- 8 files changed, 119 insertions(+), 79 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 0bd2fa435..ca9354744 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -43,6 +43,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; +use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::util::{OpenMode, OpenOptions}; use crate::vtab::VirtualTable; use core::str; @@ -278,7 +279,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), - capture_data_changes: Cell::new(false), + capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); @@ -331,7 +332,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), - capture_data_changes: Cell::new(false), + capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), }); if let Err(e) = conn.register_builtins() { @@ -436,6 +437,39 @@ fn get_schema_version(conn: &Arc, io: &Arc) -> Result { } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum CaptureDataChangesMode { + Off, + RowidOnly { table: String }, +} + +impl CaptureDataChangesMode { + pub fn parse(value: &str) -> Result { + let (mode, table) = value + .split_once(",") + .unwrap_or((value, TURSO_CDC_TABLE_NAME)); + match mode { + "off" => Ok(CaptureDataChangesMode::Off), + "rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }), + _ => Err(LimboError::InvalidArgument( + "unexpected pragma value: expected '' or ',' parameter where mode is one of off|rowid-only".to_string(), + )) + } + } + pub fn mode_name(&self) -> &str { + match self { + CaptureDataChangesMode::Off => "off", + CaptureDataChangesMode::RowidOnly { .. } => "rowid-only", + } + } + pub fn table(&self) -> Option<&str> { + match self { + CaptureDataChangesMode::Off => None, + CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()), + } + } +} + pub struct Connection { _db: Arc, pager: Rc, @@ -452,7 +486,7 @@ pub struct Connection { cache_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, - capture_data_changes: Cell, + capture_data_changes: RefCell, } impl Connection { @@ -727,11 +761,11 @@ impl Connection { self.cache_size.set(size); } - pub fn get_capture_data_changes(&self) -> bool { - self.capture_data_changes.get() + pub fn get_capture_data_changes(&self) -> std::cell::Ref<'_, CaptureDataChangesMode> { + self.capture_data_changes.borrow() } - pub fn set_capture_data_changes(&self, value: bool) { - self.capture_data_changes.set(value); + pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) { + self.capture_data_changes.replace(opts); } #[cfg(feature = "fs")] diff --git a/core/pragma.rs b/core/pragma.rs index 6efbc8b62..4f9ea1dbb 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -7,7 +7,7 @@ use turso_sqlite3_parser::ast::PragmaName; bitflags! { // Flag names match those used in SQLite: // https://github.com/sqlite/sqlite/blob/b3c1884b65400da85636458298bd77cbbfdfb401/tool/mkpragmatab.tcl#L22-L29 - struct PragmaFlags: u8 { + pub struct PragmaFlags: u8 { const NeedSchema = 0x01; /* Force schema load before running */ const NoColumns = 0x02; /* OP_ResultRow called with zero columns */ const NoColumns1 = 0x04; /* zero columns if RHS argument is present */ @@ -19,9 +19,9 @@ bitflags! { } } -struct Pragma { - flags: PragmaFlags, - columns: &'static [&'static str], +pub struct Pragma { + pub flags: PragmaFlags, + pub columns: &'static [&'static str], } impl Pragma { @@ -30,7 +30,7 @@ impl Pragma { } } -fn pragma_for(pragma: PragmaName) -> Pragma { +pub fn pragma_for(pragma: PragmaName) -> Pragma { use PragmaName::*; match pragma { @@ -79,7 +79,7 @@ fn pragma_for(pragma: PragmaName) -> Pragma { ), CaptureDataChanges => Pragma::new( PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq, - &["capture_data_changes"], + &["mode", "table"], ), } } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 05684147b..cba811387 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -7,9 +7,8 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; use crate::translate::emitter::{emit_cdc_insns, OperationMode}; -use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::util::normalize_ident; -use crate::vdbe::builder::{ProgramBuilderFlags, ProgramBuilderOpts}; +use crate::vdbe::builder::ProgramBuilderOpts; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; use crate::vdbe::BranchOffset; use crate::{ @@ -118,19 +117,17 @@ pub fn translate_insert( let halt_label = program.allocate_label(); let loop_start_label = program.allocate_label(); - let capture_data_changes = program - .flags() - .contains(ProgramBuilderFlags::CaptureDataChanges); - let turso_cdc_table = if capture_data_changes { - let Some(turso_cdc_table) = schema.get_table(TURSO_CDC_TABLE_NAME) else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let cdc_table = program.capture_data_changes_mode().table(); + let cdc_table = if let Some(cdc_table) = cdc_table { + let Some(turso_cdc_table) = schema.get_table(&cdc_table) else { + crate::bail_parse_error!("no such table: {}", cdc_table); }; - let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let Some(cdc_btree) = turso_cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table); }; Some(( - program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())), - turso_cdc_btree, + program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())), + cdc_btree, )) } else { None @@ -349,11 +346,11 @@ pub fn translate_insert( )?; } // Open turso_cdc table btree for writing if necessary - if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table { + if let Some((cdc_cursor_id, cdc_btree)) = &cdc_table { program.emit_insn(Insn::OpenWrite { - cursor_id: *turso_cdc_cursor_id, - root_page: turso_cdc_btree.root_page.into(), - name: turso_cdc_btree.name.clone(), + cursor_id: *cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), }); } @@ -444,12 +441,12 @@ pub fn translate_insert( } // Write record to the turso_cdc table if necessary - if let Some((turso_cdc_cursor_id, _)) = &turso_cdc_table { + if let Some((cdc_cursor_id, _)) = &cdc_table { emit_cdc_insns( &mut program, &resolver, OperationMode::INSERT, - *turso_cdc_cursor_id, + *cdc_cursor_id, rowid_reg, &table_name.0, )?; diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index d26824763..dcb04198f 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -7,12 +7,11 @@ use crate::{ schema::{Affinity, Index, IndexColumn, Table}, translate::{ plan::{DistinctCtx, Distinctness}, - pragma::TURSO_CDC_TABLE_NAME, result_row::emit_select_result, }, types::SeekOp, vdbe::{ - builder::{CursorKey, CursorType, ProgramBuilder, ProgramBuilderFlags}, + builder::{CursorKey, CursorType, ProgramBuilder}, insn::{CmpInsFlags, IdxInsertFlags, Insn}, BranchOffset, CursorID, }, @@ -119,29 +118,27 @@ pub fn init_loop( "meta_left_joins length does not match tables length" ); - let capture_data_changes = program - .flags() - .contains(ProgramBuilderFlags::CaptureDataChanges); - if capture_data_changes + let cdc_table = program.capture_data_changes_mode().table(); + if cdc_table.is_some() && matches!( mode, OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE ) { - let Some(turso_cdc_table) = t_ctx.resolver.schema.get_table(TURSO_CDC_TABLE_NAME) else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let cdc_table_name = cdc_table.unwrap(); + let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); }; - let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let Some(cdc_btree) = cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); }; - let turso_cdc_cursor_id = - program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())); + let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())); program.emit_insn(Insn::OpenWrite { - cursor_id: turso_cdc_cursor_id, - root_page: turso_cdc_btree.root_page.into(), - name: turso_cdc_btree.name.clone(), + cursor_id: cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), }); - t_ctx.cdc_cursor_id = Some(turso_cdc_cursor_id); + t_ctx.cdc_cursor_id = Some(cdc_cursor_id); } // Initialize ephemeral indexes for distinct aggregates diff --git a/core/translate/mod.rs b/core/translate/mod.rs index a2953b2b2..6a45e96f3 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -37,7 +37,7 @@ mod values; use crate::schema::Schema; use crate::storage::pager::Pager; use crate::translate::delete::translate_delete; -use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderFlags, ProgramBuilderOpts, QueryMode}; +use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode}; use crate::vdbe::Program; use crate::{bail_parse_error, Connection, Result, SymbolTable}; use alter::translate_alter_table; @@ -73,14 +73,9 @@ pub fn translate( | ast::Stmt::Update(..) ); - let flags = if connection.get_capture_data_changes() { - ProgramBuilderFlags::CaptureDataChanges - } else { - ProgramBuilderFlags::empty() - }; let mut program = ProgramBuilder::new( query_mode, - flags, + connection.get_capture_data_changes().clone(), // These options will be extended whithin each translate program ProgramBuilderOpts { num_cursors: 1, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 8d486cc44..7e8e1e0ee 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -6,15 +6,16 @@ use std::sync::Arc; use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr}; use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; +use crate::pragma::pragma_for; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; use crate::storage::wal::CheckpointMode; use crate::translate::schema::translate_create_table; -use crate::util::{normalize_ident, parse_pragma_bool, parse_signed_number}; +use crate::util::{normalize_ident, parse_signed_number, parse_string}; use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts}; use crate::vdbe::insn::{Cookie, Insn}; -use crate::{bail_parse_error, storage, LimboError, Value}; +use crate::{bail_parse_error, storage, CaptureDataChangesMode, LimboError, Value}; use std::str::FromStr; use strum::IntoEnumIterator; @@ -208,14 +209,14 @@ fn update_pragma( } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), PragmaName::CaptureDataChanges => { - let value = parse_pragma_bool(&value)?; + let value = parse_string(&value)?; // todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement // but for now, let's keep it as is... - connection.set_capture_data_changes(value); - if value { - // make sure that we have turso_cdc table created - return translate_create_table( - QualifiedName::single(ast::Name(TURSO_CDC_TABLE_NAME.into())), + let opts = CaptureDataChangesMode::parse(&value)?; + if let Some(table) = &opts.table() { + // make sure that we have table created + program = translate_create_table( + QualifiedName::single(ast::Name(table.to_string())), false, ast::CreateTableBody::columns_and_constraints_from_definition( turso_cdc_table_columns(), @@ -226,8 +227,9 @@ fn update_pragma( true, schema, program, - ); + )?; } + connection.set_capture_data_changes(opts); Ok(program) } } @@ -381,9 +383,18 @@ fn query_pragma( translate_integrity_check(schema, &mut program)?; } PragmaName::CaptureDataChanges => { - program.emit_bool(connection.get_capture_data_changes(), register); - program.emit_result_row(register, 1); - program.add_pragma_result_column(pragma.to_string()); + let pragma = pragma_for(pragma); + let second_column = program.alloc_register(); + let opts = connection.get_capture_data_changes(); + program.emit_string8(opts.mode_name().to_string(), register); + if let Some(table) = &opts.table() { + program.emit_string8(table.to_string(), second_column); + } else { + program.emit_null(second_column, None); + } + program.emit_result_row(register, 2); + program.add_pragma_result_column(pragma.columns[0].to_string()); + program.add_pragma_result_column(pragma.columns[1].to_string()); } } diff --git a/core/util.rs b/core/util.rs index 87a1ac0fb..d25cac606 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1044,6 +1044,19 @@ pub fn parse_signed_number(expr: &Expr) -> Result { } } +pub fn parse_string(expr: &Expr) -> Result { + match expr { + Expr::Name(ast::Name(s)) if s.len() >= 2 && s.starts_with("'") && s.ends_with("'") => { + Ok(s[1..s.len() - 1].to_string()) + } + _ => Err(LimboError::InvalidArgument(format!( + "string parameter expected, got {:?} instead", + expr + ))), + } +} + +#[allow(unused)] pub fn parse_pragma_bool(expr: &Expr) -> Result { const TRUE_VALUES: &[&str] = &["yes", "true", "on"]; const FALSE_VALUES: &[&str] = &["no", "false", "off"]; diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 795b88a8b..620c8cb19 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -1,6 +1,5 @@ use std::{cell::Cell, cmp::Ordering, rc::Rc, sync::Arc}; -use bitflags::bitflags; use tracing::{instrument, Level}; use turso_sqlite3_parser::ast::{self, TableInternalId}; @@ -13,7 +12,7 @@ use crate::{ emitter::TransactionMode, plan::{ResultSetColumn, TableReferences}, }, - Connection, Value, VirtualTable, + CaptureDataChangesMode, Connection, Value, VirtualTable, }; #[derive(Default)] @@ -111,7 +110,7 @@ pub struct ProgramBuilder { nested_level: usize, init_label: BranchOffset, start_offset: BranchOffset, - flags: ProgramBuilderFlags, + capture_data_changes_mode: CaptureDataChangesMode, } #[derive(Debug, Clone)] @@ -135,12 +134,6 @@ pub enum QueryMode { Explain, } -bitflags! { - pub struct ProgramBuilderFlags: u8 { - const CaptureDataChanges = 0x01; /* emit plans with capture data changes instructions for INSERT/DELETE/UPDATE statements */ - } -} - impl From for QueryMode { fn from(stmt: ast::Cmd) -> Self { match stmt { @@ -159,7 +152,7 @@ pub struct ProgramBuilderOpts { impl ProgramBuilder { pub fn new( query_mode: QueryMode, - flags: ProgramBuilderFlags, + capture_data_changes_mode: CaptureDataChangesMode, opts: ProgramBuilderOpts, ) -> Self { Self { @@ -184,12 +177,12 @@ impl ProgramBuilder { // These labels will be filled when `prologue()` is called init_label: BranchOffset::Placeholder, start_offset: BranchOffset::Placeholder, - flags, + capture_data_changes_mode, } } - pub fn flags(&self) -> &ProgramBuilderFlags { - &self.flags + pub fn capture_data_changes_mode(&self) -> &CaptureDataChangesMode { + &self.capture_data_changes_mode } pub fn extend(&mut self, opts: &ProgramBuilderOpts) { From 32fa2ac3ee3914ffaaa1ba9a0065d3e54602e5ab Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sun, 6 Jul 2025 22:24:35 +0400 Subject: [PATCH 15/20] avoid capturing changes in cdc table --- core/translate/insert.rs | 24 ++++++++++++++---------- core/translate/main_loop.rs | 29 ++++++++++++++++------------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index cba811387..702a7a290 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -119,16 +119,20 @@ pub fn translate_insert( let cdc_table = program.capture_data_changes_mode().table(); let cdc_table = if let Some(cdc_table) = cdc_table { - let Some(turso_cdc_table) = schema.get_table(&cdc_table) else { - crate::bail_parse_error!("no such table: {}", cdc_table); - }; - let Some(cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", cdc_table); - }; - Some(( - program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())), - cdc_btree, - )) + if table.get_name() != cdc_table { + let Some(turso_cdc_table) = schema.get_table(&cdc_table) else { + crate::bail_parse_error!("no such table: {}", cdc_table); + }; + let Some(cdc_btree) = turso_cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table); + }; + Some(( + program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())), + cdc_btree, + )) + } else { + None + } } else { None }; diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index dcb04198f..252eb193a 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -125,20 +125,23 @@ pub fn init_loop( OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE ) { + assert!(tables.joined_tables().len() == 1); let cdc_table_name = cdc_table.unwrap(); - let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else { - crate::bail_parse_error!("no such table: {}", cdc_table_name); - }; - let Some(cdc_btree) = cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", cdc_table_name); - }; - let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())); - program.emit_insn(Insn::OpenWrite { - cursor_id: cdc_cursor_id, - root_page: cdc_btree.root_page.into(), - name: cdc_btree.name.clone(), - }); - t_ctx.cdc_cursor_id = Some(cdc_cursor_id); + if tables.joined_tables()[0].table.get_name() != cdc_table_name { + let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); + }; + let Some(cdc_btree) = cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); + }; + let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())); + program.emit_insn(Insn::OpenWrite { + cursor_id: cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), + }); + t_ctx.cdc_cursor_id = Some(cdc_cursor_id); + } } // Initialize ephemeral indexes for distinct aggregates From 62c1e3880551e68adf0ace4cb5b01061ccc0fcbe Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sun, 6 Jul 2025 22:26:34 +0400 Subject: [PATCH 16/20] small fixes --- core/translate/emitter.rs | 3 +-- core/translate/insert.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 47f4ae0b4..a32a4a901 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -25,7 +25,6 @@ use crate::function::Func; use crate::schema::Schema; use crate::translate::compound_select::emit_program_for_compound_select; use crate::translate::plan::{DeletePlan, Plan, QueryDestination, Search}; -use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::translate::values::emit_values; use crate::util::exprs_are_equivalent; use crate::vdbe::builder::{CursorKey, CursorType, ProgramBuilder}; @@ -1251,7 +1250,7 @@ pub fn emit_cdc_insns( key_reg: rowid_reg, record_reg, flag: InsertFlags::new(), - table_name: TURSO_CDC_TABLE_NAME.to_string(), + table_name: "".to_string(), }); Ok(()) } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 702a7a290..319b2e7ba 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -120,7 +120,7 @@ pub fn translate_insert( let cdc_table = program.capture_data_changes_mode().table(); let cdc_table = if let Some(cdc_table) = cdc_table { if table.get_name() != cdc_table { - let Some(turso_cdc_table) = schema.get_table(&cdc_table) else { + let Some(turso_cdc_table) = schema.get_table(cdc_table) else { crate::bail_parse_error!("no such table: {}", cdc_table); }; let Some(cdc_btree) = turso_cdc_table.btree().clone() else { From a10d423aacae0dfa41c1e67505c0bf6ca4dc0635 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sun, 6 Jul 2025 22:30:57 +0400 Subject: [PATCH 17/20] adjust schema --- core/lib.rs | 4 ++-- core/translate/pragma.rs | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index ca9354744..95660a29b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -43,7 +43,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; -use crate::translate::pragma::TURSO_CDC_TABLE_NAME; +use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; use crate::util::{OpenMode, OpenOptions}; use crate::vtab::VirtualTable; use core::str; @@ -447,7 +447,7 @@ impl CaptureDataChangesMode { pub fn parse(value: &str) -> Result { let (mode, table) = value .split_once(",") - .unwrap_or((value, TURSO_CDC_TABLE_NAME)); + .unwrap_or((value, TURSO_CDC_DEFAULT_TABLE_NAME)); match mode { "off" => Ok(CaptureDataChangesMode::Off), "rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }), diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 7e8e1e0ee..d394d319c 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -465,7 +465,7 @@ fn update_cache_size( Ok(()) } -pub const TURSO_CDC_TABLE_NAME: &str = "turso_cdc"; +pub const TURSO_CDC_DEFAULT_TABLE_NAME: &str = "turso_cdc"; fn turso_cdc_table_columns() -> Vec { vec![ ast::ColumnDefinition { @@ -479,7 +479,7 @@ fn turso_cdc_table_columns() -> Vec { constraint: ast::ColumnConstraint::PrimaryKey { order: None, conflict_clause: None, - auto_increment: false, + auto_increment: true, }, }], }, @@ -508,11 +508,8 @@ fn turso_cdc_table_columns() -> Vec { constraints: vec![], }, ast::ColumnDefinition { - col_name: ast::Name("row_key".to_string()), - col_type: Some(ast::Type { - name: "BLOB".to_string(), - size: None, - }), + col_name: ast::Name("id".to_string()), + col_type: None, constraints: vec![], }, ] From 1ee475f04a53cf711d0bd7174eb8668f7958f94c Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Sun, 6 Jul 2025 22:32:42 +0400 Subject: [PATCH 18/20] rename pragma to unsable_capture_data_changes_conn --- core/pragma.rs | 2 +- core/translate/pragma.rs | 4 ++-- vendored/sqlite3-parser/src/parser/ast/mod.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/pragma.rs b/core/pragma.rs index 4f9ea1dbb..38d33a3fd 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -77,7 +77,7 @@ pub fn pragma_for(pragma: PragmaName) -> Pragma { PragmaFlags::NeedSchema | PragmaFlags::ReadOnly | PragmaFlags::Result0, &["message"], ), - CaptureDataChanges => Pragma::new( + UnstableCaptureDataChangesConn => Pragma::new( PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq, &["mode", "table"], ), diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index d394d319c..f8912d3c5 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -208,7 +208,7 @@ fn update_pragma( Ok(program) } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), - PragmaName::CaptureDataChanges => { + PragmaName::UnstableCaptureDataChangesConn => { let value = parse_string(&value)?; // todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement // but for now, let's keep it as is... @@ -382,7 +382,7 @@ fn query_pragma( PragmaName::IntegrityCheck => { translate_integrity_check(schema, &mut program)?; } - PragmaName::CaptureDataChanges => { + PragmaName::UnstableCaptureDataChangesConn => { let pragma = pragma_for(pragma); let second_column = program.alloc_register(); let opts = connection.get_capture_data_changes(); diff --git a/vendored/sqlite3-parser/src/parser/ast/mod.rs b/vendored/sqlite3-parser/src/parser/ast/mod.rs index 25fdb6897..d844d159c 100644 --- a/vendored/sqlite3-parser/src/parser/ast/mod.rs +++ b/vendored/sqlite3-parser/src/parser/ast/mod.rs @@ -1747,8 +1747,6 @@ pub enum PragmaName { AutoVacuum, /// `cache_size` pragma CacheSize, - /// enable capture-changes logic for the connection - CaptureDataChanges, /// Run integrity check on the database file IntegrityCheck, /// `journal_mode` pragma @@ -1763,6 +1761,8 @@ pub enum PragmaName { SchemaVersion, /// returns information about the columns of a table TableInfo, + /// enable capture-changes logic for the connection + UnstableCaptureDataChangesConn, /// Returns the user version of the database file. UserVersion, /// trigger a checkpoint to run on database(s) if WAL is enabled From 3e7e66c0e7908bafba877add312da81f16a18e15 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 7 Jul 2025 12:44:50 +0400 Subject: [PATCH 19/20] add basic cdc tests --- tests/integration/functions/mod.rs | 1 + tests/integration/functions/test_cdc.rs | 557 ++++++++++++++++++++++++ 2 files changed, 558 insertions(+) create mode 100644 tests/integration/functions/test_cdc.rs diff --git a/tests/integration/functions/mod.rs b/tests/integration/functions/mod.rs index 66fcb1cb5..5a2372f24 100644 --- a/tests/integration/functions/mod.rs +++ b/tests/integration/functions/mod.rs @@ -1 +1,2 @@ mod test_function_rowid; +mod test_cdc; diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs new file mode 100644 index 000000000..e69751b68 --- /dev/null +++ b/tests/integration/functions/test_cdc.rs @@ -0,0 +1,557 @@ +use rusqlite::types::Value; + +use crate::common::{limbo_exec_rows, TempDatabase}; + +fn replace_column_with_null(rows: Vec>, column: usize) -> Vec> { + rows.into_iter() + .map(|row| { + row.into_iter() + .enumerate() + .map(|(i, value)| if i == column { Value::Null } else { value }) + .collect() + }) + .collect() +} + +#[test] +fn test_cdc_simple() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (10, 10), (5, 1)") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(5), Value::Integer(1)], + vec![Value::Integer(10), Value::Integer(10)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(5) + ] + ] + ); +} + +#[test] +fn test_cdc_crud() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (20, 20), (10, 10), (5, 1)") + .unwrap(); + conn.execute("UPDATE t SET y = 100 WHERE x = 5").unwrap(); + conn.execute("DELETE FROM t WHERE x > 5").unwrap(); + conn.execute("INSERT INTO t VALUES (1, 1)").unwrap(); + conn.execute("UPDATE t SET x = 2 WHERE x = 1").unwrap(); + + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(2), Value::Integer(1)], + vec![Value::Integer(5), Value::Integer(100)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(20) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(5) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(5) + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(6), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(20) + ], + vec![ + Value::Integer(7), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(8), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(9), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_failed_op() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 10), (2, 20)") + .unwrap(); + assert!(conn + .execute("INSERT INTO t VALUES (3, 30), (4, 40), (5, 10)") + .is_err()); + conn.execute("INSERT INTO t VALUES (6, 60), (7, 70)") + .unwrap(); + + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(6), Value::Integer(60)], + vec![Value::Integer(7), Value::Integer(70)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(6) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(7) + ], + ] + ); +} + +#[test] +fn test_cdc_uncaptured_connection() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured + let conn2 = db.connect_limbo(); + conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('off')") + .unwrap(); + conn2.execute("INSERT INTO t VALUES (5, 50)").unwrap(); + + conn1.execute("INSERT INTO t VALUES (6, 60)").unwrap(); // captured + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('off')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (7, 70)").unwrap(); + + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(3), Value::Integer(30)], + vec![Value::Integer(4), Value::Integer(40)], + vec![Value::Integer(5), Value::Integer(50)], + vec![Value::Integer(6), Value::Integer(60)], + vec![Value::Integer(7), Value::Integer(70)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(4) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(6) + ], + ] + ); +} + +#[test] +fn test_cdc_custom_table() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_ignore_changes_in_cdc_table() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + ] + ); + conn1 + .execute("DELETE FROM custom_cdc WHERE operation_id < 2") + .unwrap(); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ],] + ); +} + +#[test] +fn test_cdc_transaction() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("BEGIN").unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO q VALUES (2, 20)").unwrap(); + conn1.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn1.execute("DELETE FROM t WHERE x = 1").unwrap(); + conn1.execute("UPDATE q SET y = 200 WHERE x = 2").unwrap(); + conn1.execute("COMMIT").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!(rows, vec![vec![Value::Integer(3), Value::Integer(30)],]); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM q"); + assert_eq!(rows, vec![vec![Value::Integer(2), Value::Integer(200)],]); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("q".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(0), + Value::Text("q".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_independent_connections() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + let conn2 = db.connect_limbo(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .unwrap(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn2.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)] + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ]] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc2"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ]] + ); +} + +#[test] +fn test_cdc_independent_connections_different_cdc_not_ignore() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + let conn2 = db.connect_limbo(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .unwrap(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); + conn1 + .execute("DELETE FROM custom_cdc2 WHERE operation_id < 2") + .unwrap(); + conn2 + .execute("DELETE FROM custom_cdc1 WHERE operation_id < 2") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(3), Value::Integer(30)], + vec![Value::Integer(4), Value::Integer(40)], + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(-1), + Value::Text("custom_cdc2".to_string()), + Value::Integer(1) + ] + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn2, "SELECT * FROM custom_cdc2"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(4) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(-1), + Value::Text("custom_cdc1".to_string()), + Value::Integer(1) + ] + ] + ); +} From 1655c0b84fb80dbf38036872c2f2e5eb4cb6b3c0 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 7 Jul 2025 12:49:14 +0400 Subject: [PATCH 20/20] small fixes --- core/translate/emitter.rs | 13 ++++++------- tests/integration/functions/mod.rs | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index a32a4a901..14c8e6570 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -581,7 +581,7 @@ fn emit_delete_insns( OperationMode::DELETE, turso_cdc_cursor_id, rowid_reg, - &table_reference.identifier, // is it OK to use identifier here? + table_reference.table.get_name(), )?; } @@ -1108,7 +1108,7 @@ fn emit_update_insns( OperationMode::DELETE, cdc_cursor_id, rowid_reg, - &table_ref.identifier, // is it OK to use identifier here? + table_ref.table.get_name(), )?; program.emit_insn(Insn::Copy { src_reg: rowid_set_clause_reg.expect( @@ -1123,7 +1123,7 @@ fn emit_update_insns( OperationMode::INSERT, cdc_cursor_id, rowid_reg, - &table_ref.identifier, // is it OK to use identifier here? + table_ref.table.get_name(), )?; } else { program.emit_insn(Insn::Copy { @@ -1137,7 +1137,7 @@ fn emit_update_insns( OperationMode::UPDATE, cdc_cursor_id, rowid_reg, - &table_ref.identifier, // is it OK to use identifier here? + table_ref.table.get_name(), )?; } } @@ -1189,7 +1189,7 @@ pub fn emit_cdc_insns( rowid_reg: usize, table_name: &str, ) -> Result<()> { - // (operation_id INTEGER PRIMARY KEY, operation_time INTEGER, operation_type INTEGER, table_name TEXT, row_key BLOB) + // (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id) let turso_cdc_registers = program.alloc_registers(5); program.emit_insn(Insn::Null { dest: turso_cdc_registers, @@ -1230,11 +1230,10 @@ pub fn emit_cdc_insns( }); let rowid_reg = program.alloc_register(); - // todo(sivukhin): we **must** guarantee sequential generation or operation_id column program.emit_insn(Insn::NewRowid { cursor: cdc_cursor_id, rowid_reg, - prev_largest_reg: 0, + prev_largest_reg: 0, // todo(sivukhin): properly set value here from sqlite_sequence table when AUTOINCREMENT will be properly implemented in Turso }); let record_reg = program.alloc_register(); diff --git a/tests/integration/functions/mod.rs b/tests/integration/functions/mod.rs index 5a2372f24..52b82a1c1 100644 --- a/tests/integration/functions/mod.rs +++ b/tests/integration/functions/mod.rs @@ -1,2 +1,2 @@ -mod test_function_rowid; mod test_cdc; +mod test_function_rowid;