diff --git a/core/lib.rs b/core/lib.rs index 0bd2fa435..ca9354744 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -43,6 +43,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; +use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::util::{OpenMode, OpenOptions}; use crate::vtab::VirtualTable; use core::str; @@ -278,7 +279,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), - capture_data_changes: Cell::new(false), + capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); @@ -331,7 +332,7 @@ impl Database { cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), wal_checkpoint_disabled: Cell::new(false), - capture_data_changes: Cell::new(false), + capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), }); if let Err(e) = conn.register_builtins() { @@ -436,6 +437,39 @@ fn get_schema_version(conn: &Arc, io: &Arc) -> Result { } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum CaptureDataChangesMode { + Off, + RowidOnly { table: String }, +} + +impl CaptureDataChangesMode { + pub fn parse(value: &str) -> Result { + let (mode, table) = value + .split_once(",") + .unwrap_or((value, TURSO_CDC_TABLE_NAME)); + match mode { + "off" => Ok(CaptureDataChangesMode::Off), + "rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }), + _ => Err(LimboError::InvalidArgument( + "unexpected pragma value: expected '' or ',' parameter where mode is one of off|rowid-only".to_string(), + )) + } + } + pub fn mode_name(&self) -> &str { + match self { + CaptureDataChangesMode::Off => "off", + CaptureDataChangesMode::RowidOnly { .. } => "rowid-only", + } + } + pub fn table(&self) -> Option<&str> { + match self { + CaptureDataChangesMode::Off => None, + CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()), + } + } +} + pub struct Connection { _db: Arc, pager: Rc, @@ -452,7 +486,7 @@ pub struct Connection { cache_size: Cell, readonly: Cell, wal_checkpoint_disabled: Cell, - capture_data_changes: Cell, + capture_data_changes: RefCell, } impl Connection { @@ -727,11 +761,11 @@ impl Connection { self.cache_size.set(size); } - pub fn get_capture_data_changes(&self) -> bool { - self.capture_data_changes.get() + pub fn get_capture_data_changes(&self) -> std::cell::Ref<'_, CaptureDataChangesMode> { + self.capture_data_changes.borrow() } - pub fn set_capture_data_changes(&self, value: bool) { - self.capture_data_changes.set(value); + pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) { + self.capture_data_changes.replace(opts); } #[cfg(feature = "fs")] diff --git a/core/pragma.rs b/core/pragma.rs index 6efbc8b62..4f9ea1dbb 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -7,7 +7,7 @@ use turso_sqlite3_parser::ast::PragmaName; bitflags! { // Flag names match those used in SQLite: // https://github.com/sqlite/sqlite/blob/b3c1884b65400da85636458298bd77cbbfdfb401/tool/mkpragmatab.tcl#L22-L29 - struct PragmaFlags: u8 { + pub struct PragmaFlags: u8 { const NeedSchema = 0x01; /* Force schema load before running */ const NoColumns = 0x02; /* OP_ResultRow called with zero columns */ const NoColumns1 = 0x04; /* zero columns if RHS argument is present */ @@ -19,9 +19,9 @@ bitflags! { } } -struct Pragma { - flags: PragmaFlags, - columns: &'static [&'static str], +pub struct Pragma { + pub flags: PragmaFlags, + pub columns: &'static [&'static str], } impl Pragma { @@ -30,7 +30,7 @@ impl Pragma { } } -fn pragma_for(pragma: PragmaName) -> Pragma { +pub fn pragma_for(pragma: PragmaName) -> Pragma { use PragmaName::*; match pragma { @@ -79,7 +79,7 @@ fn pragma_for(pragma: PragmaName) -> Pragma { ), CaptureDataChanges => Pragma::new( PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq, - &["capture_data_changes"], + &["mode", "table"], ), } } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 05684147b..cba811387 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -7,9 +7,8 @@ use turso_sqlite3_parser::ast::{ use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::schema::{IndexColumn, Table}; use crate::translate::emitter::{emit_cdc_insns, OperationMode}; -use crate::translate::pragma::TURSO_CDC_TABLE_NAME; use crate::util::normalize_ident; -use crate::vdbe::builder::{ProgramBuilderFlags, ProgramBuilderOpts}; +use crate::vdbe::builder::ProgramBuilderOpts; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; use crate::vdbe::BranchOffset; use crate::{ @@ -118,19 +117,17 @@ pub fn translate_insert( let halt_label = program.allocate_label(); let loop_start_label = program.allocate_label(); - let capture_data_changes = program - .flags() - .contains(ProgramBuilderFlags::CaptureDataChanges); - let turso_cdc_table = if capture_data_changes { - let Some(turso_cdc_table) = schema.get_table(TURSO_CDC_TABLE_NAME) else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let cdc_table = program.capture_data_changes_mode().table(); + let cdc_table = if let Some(cdc_table) = cdc_table { + let Some(turso_cdc_table) = schema.get_table(&cdc_table) else { + crate::bail_parse_error!("no such table: {}", cdc_table); }; - let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let Some(cdc_btree) = turso_cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table); }; Some(( - program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())), - turso_cdc_btree, + program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())), + cdc_btree, )) } else { None @@ -349,11 +346,11 @@ pub fn translate_insert( )?; } // Open turso_cdc table btree for writing if necessary - if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table { + if let Some((cdc_cursor_id, cdc_btree)) = &cdc_table { program.emit_insn(Insn::OpenWrite { - cursor_id: *turso_cdc_cursor_id, - root_page: turso_cdc_btree.root_page.into(), - name: turso_cdc_btree.name.clone(), + cursor_id: *cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), }); } @@ -444,12 +441,12 @@ pub fn translate_insert( } // Write record to the turso_cdc table if necessary - if let Some((turso_cdc_cursor_id, _)) = &turso_cdc_table { + if let Some((cdc_cursor_id, _)) = &cdc_table { emit_cdc_insns( &mut program, &resolver, OperationMode::INSERT, - *turso_cdc_cursor_id, + *cdc_cursor_id, rowid_reg, &table_name.0, )?; diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index d26824763..dcb04198f 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -7,12 +7,11 @@ use crate::{ schema::{Affinity, Index, IndexColumn, Table}, translate::{ plan::{DistinctCtx, Distinctness}, - pragma::TURSO_CDC_TABLE_NAME, result_row::emit_select_result, }, types::SeekOp, vdbe::{ - builder::{CursorKey, CursorType, ProgramBuilder, ProgramBuilderFlags}, + builder::{CursorKey, CursorType, ProgramBuilder}, insn::{CmpInsFlags, IdxInsertFlags, Insn}, BranchOffset, CursorID, }, @@ -119,29 +118,27 @@ pub fn init_loop( "meta_left_joins length does not match tables length" ); - let capture_data_changes = program - .flags() - .contains(ProgramBuilderFlags::CaptureDataChanges); - if capture_data_changes + let cdc_table = program.capture_data_changes_mode().table(); + if cdc_table.is_some() && matches!( mode, OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE ) { - let Some(turso_cdc_table) = t_ctx.resolver.schema.get_table(TURSO_CDC_TABLE_NAME) else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let cdc_table_name = cdc_table.unwrap(); + let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); }; - let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else { - crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME); + let Some(cdc_btree) = cdc_table.btree().clone() else { + crate::bail_parse_error!("no such table: {}", cdc_table_name); }; - let turso_cdc_cursor_id = - program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())); + let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())); program.emit_insn(Insn::OpenWrite { - cursor_id: turso_cdc_cursor_id, - root_page: turso_cdc_btree.root_page.into(), - name: turso_cdc_btree.name.clone(), + cursor_id: cdc_cursor_id, + root_page: cdc_btree.root_page.into(), + name: cdc_btree.name.clone(), }); - t_ctx.cdc_cursor_id = Some(turso_cdc_cursor_id); + t_ctx.cdc_cursor_id = Some(cdc_cursor_id); } // Initialize ephemeral indexes for distinct aggregates diff --git a/core/translate/mod.rs b/core/translate/mod.rs index a2953b2b2..6a45e96f3 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -37,7 +37,7 @@ mod values; use crate::schema::Schema; use crate::storage::pager::Pager; use crate::translate::delete::translate_delete; -use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderFlags, ProgramBuilderOpts, QueryMode}; +use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode}; use crate::vdbe::Program; use crate::{bail_parse_error, Connection, Result, SymbolTable}; use alter::translate_alter_table; @@ -73,14 +73,9 @@ pub fn translate( | ast::Stmt::Update(..) ); - let flags = if connection.get_capture_data_changes() { - ProgramBuilderFlags::CaptureDataChanges - } else { - ProgramBuilderFlags::empty() - }; let mut program = ProgramBuilder::new( query_mode, - flags, + connection.get_capture_data_changes().clone(), // These options will be extended whithin each translate program ProgramBuilderOpts { num_cursors: 1, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 8d486cc44..7e8e1e0ee 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -6,15 +6,16 @@ use std::sync::Arc; use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr}; use turso_sqlite3_parser::ast::{PragmaName, QualifiedName}; +use crate::pragma::pragma_for; use crate::schema::Schema; use crate::storage::pager::AutoVacuumMode; use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE; use crate::storage::wal::CheckpointMode; use crate::translate::schema::translate_create_table; -use crate::util::{normalize_ident, parse_pragma_bool, parse_signed_number}; +use crate::util::{normalize_ident, parse_signed_number, parse_string}; use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts}; use crate::vdbe::insn::{Cookie, Insn}; -use crate::{bail_parse_error, storage, LimboError, Value}; +use crate::{bail_parse_error, storage, CaptureDataChangesMode, LimboError, Value}; use std::str::FromStr; use strum::IntoEnumIterator; @@ -208,14 +209,14 @@ fn update_pragma( } PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"), PragmaName::CaptureDataChanges => { - let value = parse_pragma_bool(&value)?; + let value = parse_string(&value)?; // todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement // but for now, let's keep it as is... - connection.set_capture_data_changes(value); - if value { - // make sure that we have turso_cdc table created - return translate_create_table( - QualifiedName::single(ast::Name(TURSO_CDC_TABLE_NAME.into())), + let opts = CaptureDataChangesMode::parse(&value)?; + if let Some(table) = &opts.table() { + // make sure that we have table created + program = translate_create_table( + QualifiedName::single(ast::Name(table.to_string())), false, ast::CreateTableBody::columns_and_constraints_from_definition( turso_cdc_table_columns(), @@ -226,8 +227,9 @@ fn update_pragma( true, schema, program, - ); + )?; } + connection.set_capture_data_changes(opts); Ok(program) } } @@ -381,9 +383,18 @@ fn query_pragma( translate_integrity_check(schema, &mut program)?; } PragmaName::CaptureDataChanges => { - program.emit_bool(connection.get_capture_data_changes(), register); - program.emit_result_row(register, 1); - program.add_pragma_result_column(pragma.to_string()); + let pragma = pragma_for(pragma); + let second_column = program.alloc_register(); + let opts = connection.get_capture_data_changes(); + program.emit_string8(opts.mode_name().to_string(), register); + if let Some(table) = &opts.table() { + program.emit_string8(table.to_string(), second_column); + } else { + program.emit_null(second_column, None); + } + program.emit_result_row(register, 2); + program.add_pragma_result_column(pragma.columns[0].to_string()); + program.add_pragma_result_column(pragma.columns[1].to_string()); } } diff --git a/core/util.rs b/core/util.rs index 87a1ac0fb..d25cac606 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1044,6 +1044,19 @@ pub fn parse_signed_number(expr: &Expr) -> Result { } } +pub fn parse_string(expr: &Expr) -> Result { + match expr { + Expr::Name(ast::Name(s)) if s.len() >= 2 && s.starts_with("'") && s.ends_with("'") => { + Ok(s[1..s.len() - 1].to_string()) + } + _ => Err(LimboError::InvalidArgument(format!( + "string parameter expected, got {:?} instead", + expr + ))), + } +} + +#[allow(unused)] pub fn parse_pragma_bool(expr: &Expr) -> Result { const TRUE_VALUES: &[&str] = &["yes", "true", "on"]; const FALSE_VALUES: &[&str] = &["no", "false", "off"]; diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 795b88a8b..620c8cb19 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -1,6 +1,5 @@ use std::{cell::Cell, cmp::Ordering, rc::Rc, sync::Arc}; -use bitflags::bitflags; use tracing::{instrument, Level}; use turso_sqlite3_parser::ast::{self, TableInternalId}; @@ -13,7 +12,7 @@ use crate::{ emitter::TransactionMode, plan::{ResultSetColumn, TableReferences}, }, - Connection, Value, VirtualTable, + CaptureDataChangesMode, Connection, Value, VirtualTable, }; #[derive(Default)] @@ -111,7 +110,7 @@ pub struct ProgramBuilder { nested_level: usize, init_label: BranchOffset, start_offset: BranchOffset, - flags: ProgramBuilderFlags, + capture_data_changes_mode: CaptureDataChangesMode, } #[derive(Debug, Clone)] @@ -135,12 +134,6 @@ pub enum QueryMode { Explain, } -bitflags! { - pub struct ProgramBuilderFlags: u8 { - const CaptureDataChanges = 0x01; /* emit plans with capture data changes instructions for INSERT/DELETE/UPDATE statements */ - } -} - impl From for QueryMode { fn from(stmt: ast::Cmd) -> Self { match stmt { @@ -159,7 +152,7 @@ pub struct ProgramBuilderOpts { impl ProgramBuilder { pub fn new( query_mode: QueryMode, - flags: ProgramBuilderFlags, + capture_data_changes_mode: CaptureDataChangesMode, opts: ProgramBuilderOpts, ) -> Self { Self { @@ -184,12 +177,12 @@ impl ProgramBuilder { // These labels will be filled when `prologue()` is called init_label: BranchOffset::Placeholder, start_offset: BranchOffset::Placeholder, - flags, + capture_data_changes_mode, } } - pub fn flags(&self) -> &ProgramBuilderFlags { - &self.flags + pub fn capture_data_changes_mode(&self) -> &CaptureDataChangesMode { + &self.capture_data_changes_mode } pub fn extend(&mut self, opts: &ProgramBuilderOpts) {