allow to specify table in the capture_data_changes PRAGMA

This commit is contained in:
Nikita Sivukhin
2025-07-06 22:19:32 +04:00
parent a3732939bd
commit a988bbaffe
8 changed files with 119 additions and 79 deletions

View File

@@ -43,6 +43,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use crate::storage::{header_accessor, wal::DummyWAL};
use crate::translate::optimizer::optimize_plan;
use crate::translate::pragma::TURSO_CDC_TABLE_NAME;
use crate::util::{OpenMode, OpenOptions};
use crate::vtab::VirtualTable;
use core::str;
@@ -278,7 +279,7 @@ impl Database {
cache_size: Cell::new(default_cache_size),
readonly: Cell::new(false),
wal_checkpoint_disabled: Cell::new(false),
capture_data_changes: Cell::new(false),
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
});
if let Err(e) = conn.register_builtins() {
return Err(LimboError::ExtensionError(e));
@@ -331,7 +332,7 @@ impl Database {
cache_size: Cell::new(default_cache_size),
readonly: Cell::new(false),
wal_checkpoint_disabled: Cell::new(false),
capture_data_changes: Cell::new(false),
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
});
if let Err(e) = conn.register_builtins() {
@@ -436,6 +437,39 @@ fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CaptureDataChangesMode {
Off,
RowidOnly { table: String },
}
impl CaptureDataChangesMode {
pub fn parse(value: &str) -> Result<CaptureDataChangesMode> {
let (mode, table) = value
.split_once(",")
.unwrap_or((value, TURSO_CDC_TABLE_NAME));
match mode {
"off" => Ok(CaptureDataChangesMode::Off),
"rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }),
_ => Err(LimboError::InvalidArgument(
"unexpected pragma value: expected '<mode>' or '<mode>,<cdc-table-name>' parameter where mode is one of off|rowid-only".to_string(),
))
}
}
pub fn mode_name(&self) -> &str {
match self {
CaptureDataChangesMode::Off => "off",
CaptureDataChangesMode::RowidOnly { .. } => "rowid-only",
}
}
pub fn table(&self) -> Option<&str> {
match self {
CaptureDataChangesMode::Off => None,
CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()),
}
}
}
pub struct Connection {
_db: Arc<Database>,
pager: Rc<Pager>,
@@ -452,7 +486,7 @@ pub struct Connection {
cache_size: Cell<i32>,
readonly: Cell<bool>,
wal_checkpoint_disabled: Cell<bool>,
capture_data_changes: Cell<bool>,
capture_data_changes: RefCell<CaptureDataChangesMode>,
}
impl Connection {
@@ -727,11 +761,11 @@ impl Connection {
self.cache_size.set(size);
}
pub fn get_capture_data_changes(&self) -> bool {
self.capture_data_changes.get()
pub fn get_capture_data_changes(&self) -> std::cell::Ref<'_, CaptureDataChangesMode> {
self.capture_data_changes.borrow()
}
pub fn set_capture_data_changes(&self, value: bool) {
self.capture_data_changes.set(value);
pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) {
self.capture_data_changes.replace(opts);
}
#[cfg(feature = "fs")]

View File

@@ -7,7 +7,7 @@ use turso_sqlite3_parser::ast::PragmaName;
bitflags! {
// Flag names match those used in SQLite:
// https://github.com/sqlite/sqlite/blob/b3c1884b65400da85636458298bd77cbbfdfb401/tool/mkpragmatab.tcl#L22-L29
struct PragmaFlags: u8 {
pub struct PragmaFlags: u8 {
const NeedSchema = 0x01; /* Force schema load before running */
const NoColumns = 0x02; /* OP_ResultRow called with zero columns */
const NoColumns1 = 0x04; /* zero columns if RHS argument is present */
@@ -19,9 +19,9 @@ bitflags! {
}
}
struct Pragma {
flags: PragmaFlags,
columns: &'static [&'static str],
pub struct Pragma {
pub flags: PragmaFlags,
pub columns: &'static [&'static str],
}
impl Pragma {
@@ -30,7 +30,7 @@ impl Pragma {
}
}
fn pragma_for(pragma: PragmaName) -> Pragma {
pub fn pragma_for(pragma: PragmaName) -> Pragma {
use PragmaName::*;
match pragma {
@@ -79,7 +79,7 @@ fn pragma_for(pragma: PragmaName) -> Pragma {
),
CaptureDataChanges => Pragma::new(
PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq,
&["capture_data_changes"],
&["mode", "table"],
),
}
}

View File

@@ -7,9 +7,8 @@ use turso_sqlite3_parser::ast::{
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
use crate::schema::{IndexColumn, Table};
use crate::translate::emitter::{emit_cdc_insns, OperationMode};
use crate::translate::pragma::TURSO_CDC_TABLE_NAME;
use crate::util::normalize_ident;
use crate::vdbe::builder::{ProgramBuilderFlags, ProgramBuilderOpts};
use crate::vdbe::builder::ProgramBuilderOpts;
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
use crate::vdbe::BranchOffset;
use crate::{
@@ -118,19 +117,17 @@ pub fn translate_insert(
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
let capture_data_changes = program
.flags()
.contains(ProgramBuilderFlags::CaptureDataChanges);
let turso_cdc_table = if capture_data_changes {
let Some(turso_cdc_table) = schema.get_table(TURSO_CDC_TABLE_NAME) else {
crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME);
let cdc_table = program.capture_data_changes_mode().table();
let cdc_table = if let Some(cdc_table) = cdc_table {
let Some(turso_cdc_table) = schema.get_table(&cdc_table) else {
crate::bail_parse_error!("no such table: {}", cdc_table);
};
let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME);
let Some(cdc_btree) = turso_cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", cdc_table);
};
Some((
program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone())),
turso_cdc_btree,
program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())),
cdc_btree,
))
} else {
None
@@ -349,11 +346,11 @@ pub fn translate_insert(
)?;
}
// Open turso_cdc table btree for writing if necessary
if let Some((turso_cdc_cursor_id, turso_cdc_btree)) = &turso_cdc_table {
if let Some((cdc_cursor_id, cdc_btree)) = &cdc_table {
program.emit_insn(Insn::OpenWrite {
cursor_id: *turso_cdc_cursor_id,
root_page: turso_cdc_btree.root_page.into(),
name: turso_cdc_btree.name.clone(),
cursor_id: *cdc_cursor_id,
root_page: cdc_btree.root_page.into(),
name: cdc_btree.name.clone(),
});
}
@@ -444,12 +441,12 @@ pub fn translate_insert(
}
// Write record to the turso_cdc table if necessary
if let Some((turso_cdc_cursor_id, _)) = &turso_cdc_table {
if let Some((cdc_cursor_id, _)) = &cdc_table {
emit_cdc_insns(
&mut program,
&resolver,
OperationMode::INSERT,
*turso_cdc_cursor_id,
*cdc_cursor_id,
rowid_reg,
&table_name.0,
)?;

View File

@@ -7,12 +7,11 @@ use crate::{
schema::{Affinity, Index, IndexColumn, Table},
translate::{
plan::{DistinctCtx, Distinctness},
pragma::TURSO_CDC_TABLE_NAME,
result_row::emit_select_result,
},
types::SeekOp,
vdbe::{
builder::{CursorKey, CursorType, ProgramBuilder, ProgramBuilderFlags},
builder::{CursorKey, CursorType, ProgramBuilder},
insn::{CmpInsFlags, IdxInsertFlags, Insn},
BranchOffset, CursorID,
},
@@ -119,29 +118,27 @@ pub fn init_loop(
"meta_left_joins length does not match tables length"
);
let capture_data_changes = program
.flags()
.contains(ProgramBuilderFlags::CaptureDataChanges);
if capture_data_changes
let cdc_table = program.capture_data_changes_mode().table();
if cdc_table.is_some()
&& matches!(
mode,
OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE
)
{
let Some(turso_cdc_table) = t_ctx.resolver.schema.get_table(TURSO_CDC_TABLE_NAME) else {
crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME);
let cdc_table_name = cdc_table.unwrap();
let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else {
crate::bail_parse_error!("no such table: {}", cdc_table_name);
};
let Some(turso_cdc_btree) = turso_cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", TURSO_CDC_TABLE_NAME);
let Some(cdc_btree) = cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", cdc_table_name);
};
let turso_cdc_cursor_id =
program.alloc_cursor_id(CursorType::BTreeTable(turso_cdc_btree.clone()));
let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: turso_cdc_cursor_id,
root_page: turso_cdc_btree.root_page.into(),
name: turso_cdc_btree.name.clone(),
cursor_id: cdc_cursor_id,
root_page: cdc_btree.root_page.into(),
name: cdc_btree.name.clone(),
});
t_ctx.cdc_cursor_id = Some(turso_cdc_cursor_id);
t_ctx.cdc_cursor_id = Some(cdc_cursor_id);
}
// Initialize ephemeral indexes for distinct aggregates

View File

@@ -37,7 +37,7 @@ mod values;
use crate::schema::Schema;
use crate::storage::pager::Pager;
use crate::translate::delete::translate_delete;
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderFlags, ProgramBuilderOpts, QueryMode};
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode};
use crate::vdbe::Program;
use crate::{bail_parse_error, Connection, Result, SymbolTable};
use alter::translate_alter_table;
@@ -73,14 +73,9 @@ pub fn translate(
| ast::Stmt::Update(..)
);
let flags = if connection.get_capture_data_changes() {
ProgramBuilderFlags::CaptureDataChanges
} else {
ProgramBuilderFlags::empty()
};
let mut program = ProgramBuilder::new(
query_mode,
flags,
connection.get_capture_data_changes().clone(),
// These options will be extended whithin each translate program
ProgramBuilderOpts {
num_cursors: 1,

View File

@@ -6,15 +6,16 @@ use std::sync::Arc;
use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr};
use turso_sqlite3_parser::ast::{PragmaName, QualifiedName};
use crate::pragma::pragma_for;
use crate::schema::Schema;
use crate::storage::pager::AutoVacuumMode;
use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE;
use crate::storage::wal::CheckpointMode;
use crate::translate::schema::translate_create_table;
use crate::util::{normalize_ident, parse_pragma_bool, parse_signed_number};
use crate::util::{normalize_ident, parse_signed_number, parse_string};
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts};
use crate::vdbe::insn::{Cookie, Insn};
use crate::{bail_parse_error, storage, LimboError, Value};
use crate::{bail_parse_error, storage, CaptureDataChangesMode, LimboError, Value};
use std::str::FromStr;
use strum::IntoEnumIterator;
@@ -208,14 +209,14 @@ fn update_pragma(
}
PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"),
PragmaName::CaptureDataChanges => {
let value = parse_pragma_bool(&value)?;
let value = parse_string(&value)?;
// todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement
// but for now, let's keep it as is...
connection.set_capture_data_changes(value);
if value {
// make sure that we have turso_cdc table created
return translate_create_table(
QualifiedName::single(ast::Name(TURSO_CDC_TABLE_NAME.into())),
let opts = CaptureDataChangesMode::parse(&value)?;
if let Some(table) = &opts.table() {
// make sure that we have table created
program = translate_create_table(
QualifiedName::single(ast::Name(table.to_string())),
false,
ast::CreateTableBody::columns_and_constraints_from_definition(
turso_cdc_table_columns(),
@@ -226,8 +227,9 @@ fn update_pragma(
true,
schema,
program,
);
)?;
}
connection.set_capture_data_changes(opts);
Ok(program)
}
}
@@ -381,9 +383,18 @@ fn query_pragma(
translate_integrity_check(schema, &mut program)?;
}
PragmaName::CaptureDataChanges => {
program.emit_bool(connection.get_capture_data_changes(), register);
program.emit_result_row(register, 1);
program.add_pragma_result_column(pragma.to_string());
let pragma = pragma_for(pragma);
let second_column = program.alloc_register();
let opts = connection.get_capture_data_changes();
program.emit_string8(opts.mode_name().to_string(), register);
if let Some(table) = &opts.table() {
program.emit_string8(table.to_string(), second_column);
} else {
program.emit_null(second_column, None);
}
program.emit_result_row(register, 2);
program.add_pragma_result_column(pragma.columns[0].to_string());
program.add_pragma_result_column(pragma.columns[1].to_string());
}
}

View File

@@ -1044,6 +1044,19 @@ pub fn parse_signed_number(expr: &Expr) -> Result<Value> {
}
}
pub fn parse_string(expr: &Expr) -> Result<String> {
match expr {
Expr::Name(ast::Name(s)) if s.len() >= 2 && s.starts_with("'") && s.ends_with("'") => {
Ok(s[1..s.len() - 1].to_string())
}
_ => Err(LimboError::InvalidArgument(format!(
"string parameter expected, got {:?} instead",
expr
))),
}
}
#[allow(unused)]
pub fn parse_pragma_bool(expr: &Expr) -> Result<bool> {
const TRUE_VALUES: &[&str] = &["yes", "true", "on"];
const FALSE_VALUES: &[&str] = &["no", "false", "off"];

View File

@@ -1,6 +1,5 @@
use std::{cell::Cell, cmp::Ordering, rc::Rc, sync::Arc};
use bitflags::bitflags;
use tracing::{instrument, Level};
use turso_sqlite3_parser::ast::{self, TableInternalId};
@@ -13,7 +12,7 @@ use crate::{
emitter::TransactionMode,
plan::{ResultSetColumn, TableReferences},
},
Connection, Value, VirtualTable,
CaptureDataChangesMode, Connection, Value, VirtualTable,
};
#[derive(Default)]
@@ -111,7 +110,7 @@ pub struct ProgramBuilder {
nested_level: usize,
init_label: BranchOffset,
start_offset: BranchOffset,
flags: ProgramBuilderFlags,
capture_data_changes_mode: CaptureDataChangesMode,
}
#[derive(Debug, Clone)]
@@ -135,12 +134,6 @@ pub enum QueryMode {
Explain,
}
bitflags! {
pub struct ProgramBuilderFlags: u8 {
const CaptureDataChanges = 0x01; /* emit plans with capture data changes instructions for INSERT/DELETE/UPDATE statements */
}
}
impl From<ast::Cmd> for QueryMode {
fn from(stmt: ast::Cmd) -> Self {
match stmt {
@@ -159,7 +152,7 @@ pub struct ProgramBuilderOpts {
impl ProgramBuilder {
pub fn new(
query_mode: QueryMode,
flags: ProgramBuilderFlags,
capture_data_changes_mode: CaptureDataChangesMode,
opts: ProgramBuilderOpts,
) -> Self {
Self {
@@ -184,12 +177,12 @@ impl ProgramBuilder {
// These labels will be filled when `prologue()` is called
init_label: BranchOffset::Placeholder,
start_offset: BranchOffset::Placeholder,
flags,
capture_data_changes_mode,
}
}
pub fn flags(&self) -> &ProgramBuilderFlags {
&self.flags
pub fn capture_data_changes_mode(&self) -> &CaptureDataChangesMode {
&self.capture_data_changes_mode
}
pub fn extend(&mut self, opts: &ProgramBuilderOpts) {