diff --git a/core/lib.rs b/core/lib.rs index 4067aac15..95660a29b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -43,6 +43,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; +use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; use crate::util::{OpenMode, OpenOptions}; use crate::vtab::VirtualTable; use core::str; @@ -278,6 +279,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), + capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); @@ -330,6 +332,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), + capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), }); if let Err(e) = conn.register_builtins() { @@ -434,6 +437,39 @@ fn get_schema_version(conn: &Arc, io: &Arc) -> Result { } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum CaptureDataChangesMode { + Off, + RowidOnly { table: String }, +} + +impl CaptureDataChangesMode { + pub fn parse(value: &str) -> Result { + let (mode, table) = value + .split_once(",") + .unwrap_or((value, TURSO_CDC_DEFAULT_TABLE_NAME)); + match mode { + "off" => Ok(CaptureDataChangesMode::Off), + "rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }), + _ => Err(LimboError::InvalidArgument( + "unexpected pragma value: expected '' or ',' parameter where mode is one of off|rowid-only".to_string(), + )) + } + } + pub fn mode_name(&self) -> &str { + match self { + CaptureDataChangesMode::Off => "off", + CaptureDataChangesMode::RowidOnly { .. } => "rowid-only", + } + } + pub fn table(&self) -> Option<&str> { + match self { + CaptureDataChangesMode::Off => None, + CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()), + } + } +} + pub struct Connection { _db: Arc, pager: Rc, @@ -450,6 +486,7 @@ pub struct Connection { cache_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, + capture_data_changes: RefCell, } impl Connection { @@ -724,6 +761,13 @@ impl Connection { self.cache_size.set(size); } + pub fn get_capture_data_changes(&self) -> std::cell::Ref<'_, CaptureDataChangesMode> { + self.capture_data_changes.borrow() + } + pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) { + self.capture_data_changes.replace(opts); + } + #[cfg(feature = "fs")] pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc, Arc)> { Database::open_with_vfs(&self._db, path, vfs) diff --git a/core/pragma.rs b/core/pragma.rs index a65aefff4..38d33a3fd 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -7,21 +7,21 @@ use turso_sqlite3_parser::ast::PragmaName; bitflags! { // Flag names match those used in SQLite: // https://github.com/sqlite/sqlite/blob/b3c1884b65400da85636458298bd77cbbfdfb401/tool/mkpragmatab.tcl#L22-L29 - struct PragmaFlags: u8 { - const NeedSchema = 0x01; - const NoColumns = 0x02; - const NoColumns1 = 0x04; - const ReadOnly = 0x08; - const Result0 = 0x10; - const Result1 = 0x20; - const SchemaOpt = 0x40; - const SchemaReq = 0x80; + pub struct PragmaFlags: u8 { + const NeedSchema = 0x01; /* Force schema load before running */ + const NoColumns = 0x02; /* OP_ResultRow called with zero columns */ + const NoColumns1 = 0x04; /* zero columns if RHS argument is present */ + const ReadOnly = 0x08; /* Read-only HEADER_VALUE */ + const Result0 = 0x10; /* Acts as query when no argument */ + const Result1 = 0x20; /* Acts as query when has one argument */ + const SchemaOpt = 0x40; /* Schema restricts name search if present */ + const SchemaReq = 0x80; /* Schema required - "main" is default */ } } -struct Pragma { - flags: PragmaFlags, - columns: &'static [&'static str], +pub struct Pragma { + pub flags: PragmaFlags, + pub columns: &'static [&'static str], } impl Pragma { @@ -30,7 +30,7 @@ impl Pragma { } } -fn pragma_for(pragma: PragmaName) -> Pragma { +pub fn pragma_for(pragma: PragmaName) -> Pragma { use PragmaName::*; match pragma { @@ -77,6 +77,10 @@ fn pragma_for(pragma: PragmaName) -> Pragma { PragmaFlags::NeedSchema | PragmaFlags::ReadOnly | PragmaFlags::Result0, &["message"], ), + UnstableCaptureDataChangesConn => Pragma::new( + PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq, + &["mode", "table"], + ), } } diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 17540ac3a..14c8e6570 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -31,7 +31,7 @@ use crate::vdbe::builder::{CursorKey, CursorType, ProgramBuilder}; use crate::vdbe::insn::{CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral}; use crate::vdbe::CursorID; use crate::vdbe::{insn::Insn, BranchOffset}; -use crate::{Result, SymbolTable}; +use crate::{bail_parse_error, Result, SymbolTable}; pub struct Resolver<'a> { pub schema: &'a Schema, @@ -149,6 +149,8 @@ pub struct TranslateCtx<'a> { /// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause. /// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`. pub non_aggregate_expressions: Vec<(&'a Expr, bool)>, + /// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data) + pub cdc_cursor_id: Option, } impl<'a> TranslateCtx<'a> { @@ -175,6 +177,7 @@ impl<'a> TranslateCtx<'a> { result_columns_to_skip_in_orderby_sorter: None, resolver: Resolver::new(schema, syms), non_aggregate_expressions: Vec::new(), + cdc_cursor_id: None, } } } @@ -566,6 +569,22 @@ fn emit_delete_insns( } } + if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id { + let rowid_reg = program.alloc_register(); + program.emit_insn(Insn::RowId { + cursor_id: main_table_cursor_id, + dest: rowid_reg, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::DELETE, + turso_cdc_cursor_id, + rowid_reg, + table_reference.table.get_name(), + )?; + } + program.emit_insn(Insn::Delete { cursor_id: main_table_cursor_id, }); @@ -1076,6 +1095,53 @@ fn emit_update_insns( }); } + if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id { + let rowid_reg = program.alloc_register(); + if has_user_provided_rowid { + program.emit_insn(Insn::RowId { + cursor_id, + dest: rowid_reg, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::DELETE, + cdc_cursor_id, + rowid_reg, + table_ref.table.get_name(), + )?; + program.emit_insn(Insn::Copy { + src_reg: rowid_set_clause_reg.expect( + "rowid_set_clause_reg must be set because has_user_provided_rowid is true", + ), + dst_reg: rowid_reg, + amount: 1, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::INSERT, + cdc_cursor_id, + rowid_reg, + table_ref.table.get_name(), + )?; + } else { + program.emit_insn(Insn::Copy { + src_reg: rowid_set_clause_reg.unwrap_or(beg), + dst_reg: rowid_reg, + amount: 1, + }); + emit_cdc_insns( + program, + &t_ctx.resolver, + OperationMode::UPDATE, + cdc_cursor_id, + rowid_reg, + table_ref.table.get_name(), + )?; + } + } + // If we are updating the rowid, we cannot rely on overwrite on the // Insert instruction to update the cell. We need to first delete the current cell // and later insert the updated record @@ -1115,6 +1181,79 @@ fn emit_update_insns( Ok(()) } +pub fn emit_cdc_insns( + program: &mut ProgramBuilder, + resolver: &Resolver, + operation_mode: OperationMode, + cdc_cursor_id: usize, + rowid_reg: usize, + table_name: &str, +) -> Result<()> { + // (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id) + let turso_cdc_registers = program.alloc_registers(5); + program.emit_insn(Insn::Null { + dest: turso_cdc_registers, + dest_end: None, + }); + program.mark_last_insn_constant(); + + let Some(unixepoch_fn) = resolver.resolve_function("unixepoch", 0) else { + bail_parse_error!("no function {}", "unixepoch"); + }; + let unixepoch_fn_ctx = crate::function::FuncCtx { + func: unixepoch_fn, + arg_count: 0, + }; + + program.emit_insn(Insn::Function { + constant_mask: 0, + start_reg: 0, + dest: turso_cdc_registers + 1, + func: unixepoch_fn_ctx, + }); + + let operation_type = match operation_mode { + OperationMode::INSERT => 1, + OperationMode::UPDATE | OperationMode::SELECT => 0, + OperationMode::DELETE => -1, + }; + program.emit_int(operation_type, turso_cdc_registers + 2); + program.mark_last_insn_constant(); + + program.emit_string8(table_name.to_string(), turso_cdc_registers + 3); + program.mark_last_insn_constant(); + + program.emit_insn(Insn::Copy { + src_reg: rowid_reg, + dst_reg: turso_cdc_registers + 4, + amount: 0, + }); + + let rowid_reg = program.alloc_register(); + program.emit_insn(Insn::NewRowid { + cursor: cdc_cursor_id, + rowid_reg, + prev_largest_reg: 0, // todo(sivukhin): properly set value here from sqlite_sequence table when AUTOINCREMENT will be properly implemented in Turso + }); + + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: turso_cdc_registers, + count: 5, + dest_reg: record_reg, + index_name: None, + }); + + program.emit_insn(Insn::Insert { + cursor: cdc_cursor_id, + key_reg: rowid_reg, + record_reg, + flag: InsertFlags::new(), + table_name: "".to_string(), + }); + Ok(()) +} + /// Initialize the limit/offset counters and registers. /// In case of compound SELECTs, the limit counter is initialized only once, /// hence [LimitCtx::initialize_counter] being false in those cases. diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 872b891c7..319b2e7ba 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -6,6 +6,7 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; +use crate::translate::emitter::{emit_cdc_insns, OperationMode}; use crate::util::normalize_ident; use crate::vdbe::builder::ProgramBuilderOpts; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; @@ -116,6 +117,26 @@ pub fn translate_insert( let halt_label = program.allocate_label(); let loop_start_label = program.allocate_label(); + let cdc_table = program.capture_data_changes_mode().table(); + let cdc_table = if let Some(cdc_table) = cdc_table { + if table.get_name() != cdc_table { + let Some(turso_cdc_table) = schema.get_table(cdc_table) else { + crate::bail_parse_error!("no such table: {}", cdc_table); + }; + let Some(cdc_btree) = turso_cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table); + }; + Some(( + program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())), + cdc_btree, + )) + } else { + None + } + } else { + None + }; + let mut yield_reg_opt = None; let mut temp_table_ctx = None; let (num_values, cursor_id) = match body { @@ -328,6 +349,15 @@ pub fn translate_insert( &resolver, )?; } + // Open turso_cdc table btree for writing if necessary + if let Some((cdc_cursor_id, cdc_btree)) = &cdc_table { + program.emit_insn(Insn::OpenWrite { + cursor_id: *cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), + }); + } + // Open all the index btrees for writing for idx_cursor in idx_cursors.iter() { program.emit_insn(Insn::OpenWrite { @@ -414,6 +444,18 @@ pub fn translate_insert( _ => (), } + // Write record to the turso_cdc table if necessary + if let Some((cdc_cursor_id, _)) = &cdc_table { + emit_cdc_insns( + &mut program, + &resolver, + OperationMode::INSERT, + *cdc_cursor_id, + rowid_reg, + &table_name.0, + )?; + } + let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?; for index_col_mapping in index_col_mappings { // find which cursor we opened earlier for this index diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index 88c18c054..252eb193a 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -117,6 +117,33 @@ pub fn init_loop( t_ctx.meta_left_joins.len() == tables.joined_tables().len(), "meta_left_joins length does not match tables length" ); + + let cdc_table = program.capture_data_changes_mode().table(); + if cdc_table.is_some() + && matches!( + mode, + OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE + ) + { + assert!(tables.joined_tables().len() == 1); + let cdc_table_name = cdc_table.unwrap(); + if tables.joined_tables()[0].table.get_name() != cdc_table_name { + let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); + }; + let Some(cdc_btree) = cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); + }; + let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())); + program.emit_insn(Insn::OpenWrite { + cursor_id: cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), + }); + t_ctx.cdc_cursor_id = Some(cdc_cursor_id); + } + } + // Initialize ephemeral indexes for distinct aggregates for (i, agg) in aggregates .iter_mut() diff --git a/core/translate/mod.rs b/core/translate/mod.rs index b7c82d585..6a45e96f3 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -75,6 +75,7 @@ pub fn translate( let mut program = ProgramBuilder::new( query_mode, + connection.get_capture_data_changes().clone(), // These options will be extended whithin each translate program ProgramBuilderOpts { num_cursors: 1, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 93883a14b..f8912d3c5 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -3,17 +3,19 @@ use std::rc::Rc; use std::sync::Arc; -use turso_sqlite3_parser::ast::PragmaName; -use turso_sqlite3_parser::ast::{self, Expr}; +use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr}; +use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; +use crate::pragma::pragma_for; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; use crate::storage::wal::CheckpointMode; -use crate::util::{normalize_ident, parse_signed_number}; +use crate::translate::schema::translate_create_table; +use crate::util::{normalize_ident, parse_signed_number, parse_string}; use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts}; use crate::vdbe::insn::{Cookie, Insn}; -use crate::{bail_parse_error, storage, LimboError, Value}; +use crate::{bail_parse_error, storage, CaptureDataChangesMode, LimboError, Value}; use std::str::FromStr; use strum::IntoEnumIterator; @@ -57,17 +59,15 @@ pub fn translate_pragma( Err(_) => bail_parse_error!("Not a valid pragma name"), }; - match body { - None => { - query_pragma(pragma, schema, None, pager, connection, &mut program)?; - } + let mut program = match body { + None => query_pragma(pragma, schema, None, pager, connection, program)?, Some(ast::PragmaBody::Equals(value) | ast::PragmaBody::Call(value)) => match pragma { PragmaName::TableInfo => { - query_pragma(pragma, schema, Some(value), pager, connection, &mut program)?; + query_pragma(pragma, schema, Some(value), pager, connection, program)? } _ => { write = true; - update_pragma(pragma, schema, value, pager, connection, &mut program)?; + update_pragma(pragma, schema, value, pager, connection, program)? } }, }; @@ -85,8 +85,8 @@ fn update_pragma( value: ast::Expr, pager: Rc, connection: Arc, - program: &mut ProgramBuilder, -) -> crate::Result<()> { + mut program: ProgramBuilder, +) -> crate::Result { match pragma { PragmaName::CacheSize => { let cache_size = match parse_signed_number(&value)? { @@ -95,42 +95,33 @@ fn update_pragma( _ => bail_parse_error!("Invalid value for cache size pragma"), }; update_cache_size(cache_size, pager, connection)?; - Ok(()) - } - PragmaName::JournalMode => { - query_pragma( - PragmaName::JournalMode, - schema, - None, - pager, - connection, - program, - )?; - Ok(()) - } - PragmaName::LegacyFileFormat => Ok(()), - PragmaName::WalCheckpoint => { - query_pragma( - PragmaName::WalCheckpoint, - schema, - Some(value), - pager, - connection, - program, - )?; - Ok(()) - } - PragmaName::PageCount => { - query_pragma( - PragmaName::PageCount, - schema, - None, - pager, - connection, - program, - )?; - Ok(()) + Ok(program) } + PragmaName::JournalMode => query_pragma( + PragmaName::JournalMode, + schema, + None, + pager, + connection, + program, + ), + PragmaName::LegacyFileFormat => Ok(program), + PragmaName::WalCheckpoint => query_pragma( + PragmaName::WalCheckpoint, + schema, + Some(value), + pager, + connection, + program, + ), + PragmaName::PageCount => query_pragma( + PragmaName::PageCount, + schema, + None, + pager, + connection, + program, + ), PragmaName::UserVersion => { let data = parse_signed_number(&value)?; let version_value = match data { @@ -145,7 +136,7 @@ fn update_pragma( value: version_value, p5: 1, }); - Ok(()) + Ok(program) } PragmaName::SchemaVersion => { // TODO: Implement updating schema_version @@ -214,9 +205,33 @@ fn update_pragma( value: auto_vacuum_mode - 1, p5: 0, }); - Ok(()) + Ok(program) } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), + PragmaName::UnstableCaptureDataChangesConn => { + let value = parse_string(&value)?; + // todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement + // but for now, let's keep it as is... + let opts = CaptureDataChangesMode::parse(&value)?; + if let Some(table) = &opts.table() { + // make sure that we have table created + program = translate_create_table( + QualifiedName::single(ast::Name(table.to_string())), + false, + ast::CreateTableBody::columns_and_constraints_from_definition( + turso_cdc_table_columns(), + None, + ast::TableOptions::NONE, + ) + .unwrap(), + true, + schema, + program, + )?; + } + connection.set_capture_data_changes(opts); + Ok(program) + } } } @@ -226,8 +241,8 @@ fn query_pragma( value: Option, pager: Rc, connection: Arc, - program: &mut ProgramBuilder, -) -> crate::Result<()> { + mut program: ProgramBuilder, +) -> crate::Result { let register = program.alloc_register(); match pragma { PragmaName::CacheSize => { @@ -365,11 +380,25 @@ fn query_pragma( program.emit_result_row(register, 1); } PragmaName::IntegrityCheck => { - translate_integrity_check(schema, program)?; + translate_integrity_check(schema, &mut program)?; + } + PragmaName::UnstableCaptureDataChangesConn => { + let pragma = pragma_for(pragma); + let second_column = program.alloc_register(); + let opts = connection.get_capture_data_changes(); + program.emit_string8(opts.mode_name().to_string(), register); + if let Some(table) = &opts.table() { + program.emit_string8(table.to_string(), second_column); + } else { + program.emit_null(second_column, None); + } + program.emit_result_row(register, 2); + program.add_pragma_result_column(pragma.columns[0].to_string()); + program.add_pragma_result_column(pragma.columns[1].to_string()); } } - Ok(()) + Ok(program) } fn update_auto_vacuum_mode( @@ -435,3 +464,53 @@ fn update_cache_size( Ok(()) } + +pub const TURSO_CDC_DEFAULT_TABLE_NAME: &str = "turso_cdc"; +fn turso_cdc_table_columns() -> Vec { + vec![ + ast::ColumnDefinition { + col_name: ast::Name("operation_id".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![ast::NamedColumnConstraint { + name: None, + constraint: ast::ColumnConstraint::PrimaryKey { + order: None, + conflict_clause: None, + auto_increment: true, + }, + }], + }, + ast::ColumnDefinition { + col_name: ast::Name("operation_time".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("operation_type".to_string()), + col_type: Some(ast::Type { + name: "INTEGER".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("table_name".to_string()), + col_type: Some(ast::Type { + name: "TEXT".to_string(), + size: None, + }), + constraints: vec![], + }, + ast::ColumnDefinition { + col_name: ast::Name("id".to_string()), + col_type: None, + constraints: vec![], + }, + ] +} diff --git a/core/translate/subquery.rs b/core/translate/subquery.rs index 4004a7cd1..645c95f6e 100644 --- a/core/translate/subquery.rs +++ b/core/translate/subquery.rs @@ -82,6 +82,7 @@ pub fn emit_subquery( reg_limit_offset_sum: None, resolver: Resolver::new(t_ctx.resolver.schema, t_ctx.resolver.symbol_table), non_aggregate_expressions: Vec::new(), + cdc_cursor_id: None, }; let subquery_body_end_label = program.allocate_label(); program.emit_insn(Insn::InitCoroutine { diff --git a/core/util.rs b/core/util.rs index 1415bcfe7..d25cac606 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1044,6 +1044,41 @@ pub fn parse_signed_number(expr: &Expr) -> Result { } } +pub fn parse_string(expr: &Expr) -> Result { + match expr { + Expr::Name(ast::Name(s)) if s.len() >= 2 && s.starts_with("'") && s.ends_with("'") => { + Ok(s[1..s.len() - 1].to_string()) + } + _ => Err(LimboError::InvalidArgument(format!( + "string parameter expected, got {:?} instead", + expr + ))), + } +} + +#[allow(unused)] +pub fn parse_pragma_bool(expr: &Expr) -> Result { + const TRUE_VALUES: &[&str] = &["yes", "true", "on"]; + const FALSE_VALUES: &[&str] = &["no", "false", "off"]; + if let Ok(number) = parse_signed_number(expr) { + if let Value::Integer(x @ (0 | 1)) = number { + return Ok(x != 0); + } + } else if let Expr::Name(name) = expr { + let ident = normalize_ident(&name.0); + if TRUE_VALUES.contains(&ident.as_str()) { + return Ok(true); + } + if FALSE_VALUES.contains(&ident.as_str()) { + return Ok(false); + } + } + Err(LimboError::InvalidArgument( + "boolean pragma value must be either 0|1 integer or yes|true|on|no|false|off token" + .to_string(), + )) +} + // for TVF's we need these at planning time so we cannot emit translate_expr pub fn vtable_args(args: &[ast::Expr]) -> Vec { let mut vtable_args = Vec::new(); @@ -1076,7 +1111,7 @@ pub fn vtable_args(args: &[ast::Expr]) -> Vec { #[cfg(test)] pub mod tests { use super::*; - use turso_sqlite3_parser::ast::{self, Expr, Id, Literal, Operator::*, Type}; + use turso_sqlite3_parser::ast::{self, Expr, Id, Literal, Name, Operator::*, Type}; #[test] fn test_normalize_ident() { @@ -2031,4 +2066,21 @@ pub mod tests { Value::Float(-9.223_372_036_854_776e18) ); } + + #[test] + fn test_parse_pragma_bool() { + assert!(parse_pragma_bool(&Expr::Literal(Literal::Numeric("1".into()))).unwrap(),); + assert!(parse_pragma_bool(&Expr::Name(Name("true".into()))).unwrap(),); + assert!(parse_pragma_bool(&Expr::Name(Name("on".into()))).unwrap(),); + assert!(parse_pragma_bool(&Expr::Name(Name("yes".into()))).unwrap(),); + + assert!(!parse_pragma_bool(&Expr::Literal(Literal::Numeric("0".into()))).unwrap(),); + assert!(!parse_pragma_bool(&Expr::Name(Name("false".into()))).unwrap(),); + assert!(!parse_pragma_bool(&Expr::Name(Name("off".into()))).unwrap(),); + assert!(!parse_pragma_bool(&Expr::Name(Name("no".into()))).unwrap(),); + + assert!(parse_pragma_bool(&Expr::Name(Name("nono".into()))).is_err()); + assert!(parse_pragma_bool(&Expr::Name(Name("10".into()))).is_err()); + assert!(parse_pragma_bool(&Expr::Name(Name("-1".into()))).is_err()); + } } diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 7d45c5f4b..620c8cb19 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -12,7 +12,7 @@ use crate::{ emitter::TransactionMode, plan::{ResultSetColumn, TableReferences}, }, - Connection, Value, VirtualTable, + CaptureDataChangesMode, Connection, Value, VirtualTable, }; #[derive(Default)] @@ -110,6 +110,7 @@ pub struct ProgramBuilder { nested_level: usize, init_label: BranchOffset, start_offset: BranchOffset, + capture_data_changes_mode: CaptureDataChangesMode, } #[derive(Debug, Clone)] @@ -149,7 +150,11 @@ pub struct ProgramBuilderOpts { } impl ProgramBuilder { - pub fn new(query_mode: QueryMode, opts: ProgramBuilderOpts) -> Self { + pub fn new( + query_mode: QueryMode, + capture_data_changes_mode: CaptureDataChangesMode, + opts: ProgramBuilderOpts, + ) -> Self { Self { table_reference_counter: TableRefIdCounter::new(), next_free_register: 1, @@ -172,9 +177,14 @@ impl ProgramBuilder { // These labels will be filled when `prologue()` is called init_label: BranchOffset::Placeholder, start_offset: BranchOffset::Placeholder, + capture_data_changes_mode, } } + pub fn capture_data_changes_mode(&self) -> &CaptureDataChangesMode { + &self.capture_data_changes_mode + } + pub fn extend(&mut self, opts: &ProgramBuilderOpts) { self.insns.reserve(opts.approx_num_insns); self.cursor_ref.reserve(opts.num_cursors); diff --git a/tests/integration/functions/mod.rs b/tests/integration/functions/mod.rs index 66fcb1cb5..52b82a1c1 100644 --- a/tests/integration/functions/mod.rs +++ b/tests/integration/functions/mod.rs @@ -1 +1,2 @@ +mod test_cdc; mod test_function_rowid; diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs new file mode 100644 index 000000000..e69751b68 --- /dev/null +++ b/tests/integration/functions/test_cdc.rs @@ -0,0 +1,557 @@ +use rusqlite::types::Value; + +use crate::common::{limbo_exec_rows, TempDatabase}; + +fn replace_column_with_null(rows: Vec>, column: usize) -> Vec> { + rows.into_iter() + .map(|row| { + row.into_iter() + .enumerate() + .map(|(i, value)| if i == column { Value::Null } else { value }) + .collect() + }) + .collect() +} + +#[test] +fn test_cdc_simple() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (10, 10), (5, 1)") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(5), Value::Integer(1)], + vec![Value::Integer(10), Value::Integer(10)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(5) + ] + ] + ); +} + +#[test] +fn test_cdc_crud() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (20, 20), (10, 10), (5, 1)") + .unwrap(); + conn.execute("UPDATE t SET y = 100 WHERE x = 5").unwrap(); + conn.execute("DELETE FROM t WHERE x > 5").unwrap(); + conn.execute("INSERT INTO t VALUES (1, 1)").unwrap(); + conn.execute("UPDATE t SET x = 2 WHERE x = 1").unwrap(); + + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(2), Value::Integer(1)], + vec![Value::Integer(5), Value::Integer(100)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(20) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(5) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(5) + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(6), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(20) + ], + vec![ + Value::Integer(7), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(8), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(9), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_failed_op() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 10), (2, 20)") + .unwrap(); + assert!(conn + .execute("INSERT INTO t VALUES (3, 30), (4, 40), (5, 10)") + .is_err()); + conn.execute("INSERT INTO t VALUES (6, 60), (7, 70)") + .unwrap(); + + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(6), Value::Integer(60)], + vec![Value::Integer(7), Value::Integer(70)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(6) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(7) + ], + ] + ); +} + +#[test] +fn test_cdc_uncaptured_connection() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured + let conn2 = db.connect_limbo(); + conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('off')") + .unwrap(); + conn2.execute("INSERT INTO t VALUES (5, 50)").unwrap(); + + conn1.execute("INSERT INTO t VALUES (6, 60)").unwrap(); // captured + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('off')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (7, 70)").unwrap(); + + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(3), Value::Integer(30)], + vec![Value::Integer(4), Value::Integer(40)], + vec![Value::Integer(5), Value::Integer(50)], + vec![Value::Integer(6), Value::Integer(60)], + vec![Value::Integer(7), Value::Integer(70)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(4) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(6) + ], + ] + ); +} + +#[test] +fn test_cdc_custom_table() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_ignore_changes_in_cdc_table() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + ] + ); + conn1 + .execute("DELETE FROM custom_cdc WHERE operation_id < 2") + .unwrap(); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ],] + ); +} + +#[test] +fn test_cdc_transaction() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("BEGIN").unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO q VALUES (2, 20)").unwrap(); + conn1.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn1.execute("DELETE FROM t WHERE x = 1").unwrap(); + conn1.execute("UPDATE q SET y = 200 WHERE x = 2").unwrap(); + conn1.execute("COMMIT").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!(rows, vec![vec![Value::Integer(3), Value::Integer(30)],]); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM q"); + assert_eq!(rows, vec![vec![Value::Integer(2), Value::Integer(200)],]); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("q".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(0), + Value::Text("q".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_independent_connections() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + let conn2 = db.connect_limbo(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .unwrap(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn2.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)] + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ]] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc2"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ]] + ); +} + +#[test] +fn test_cdc_independent_connections_different_cdc_not_ignore() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + let conn2 = db.connect_limbo(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .unwrap(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); + conn1 + .execute("DELETE FROM custom_cdc2 WHERE operation_id < 2") + .unwrap(); + conn2 + .execute("DELETE FROM custom_cdc1 WHERE operation_id < 2") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(3), Value::Integer(30)], + vec![Value::Integer(4), Value::Integer(40)], + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(-1), + Value::Text("custom_cdc2".to_string()), + Value::Integer(1) + ] + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn2, "SELECT * FROM custom_cdc2"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(4) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(-1), + Value::Text("custom_cdc1".to_string()), + Value::Integer(1) + ] + ] + ); +} diff --git a/vendored/sqlite3-parser/src/parser/ast/mod.rs b/vendored/sqlite3-parser/src/parser/ast/mod.rs index 956fa88bd..d844d159c 100644 --- a/vendored/sqlite3-parser/src/parser/ast/mod.rs +++ b/vendored/sqlite3-parser/src/parser/ast/mod.rs @@ -1362,6 +1362,23 @@ impl CreateTableBody { options, }) } + + /// Constructor from Vec of column definition + pub fn columns_and_constraints_from_definition( + columns_vec: Vec, + constraints: Option>, + options: TableOptions, + ) -> Result { + let mut columns = IndexMap::new(); + for def in columns_vec { + columns.insert(def.col_name.clone(), def); + } + Ok(Self::ColumnsAndConstraints { + columns, + constraints, + options, + }) + } } /// Table column definition @@ -1744,6 +1761,8 @@ pub enum PragmaName { SchemaVersion, /// returns information about the columns of a table TableInfo, + /// enable capture-changes logic for the connection + UnstableCaptureDataChangesConn, /// Returns the user version of the database file. UserVersion, /// trigger a checkpoint to run on database(s) if WAL is enabled