mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-27 21:14:21 +01:00
Merge 'Change data capture' from Nikita Sivukhin
This PR add basic CDC functionality to the `turso-db`.
### Feature components
1. `unstable_capture_data_changes_conn` pragma which allow user to turn
on/off CDC logging for **specific connection**
* CDC will have multiple modes, but for now only `off` / `rowid-
only` are supported
* Default CDC table is `turso_cdc` but user can override this with
`PRAGMA` update syntax and use arbitrary table for the CDC needs
* This can be helpful in future if turso will need to break table
format compatibility and custom tables can be a way to migrate between
different schemas
* Update syntax for the pragma accepts one string argument in
format, where only mode is set or custom cdc table name is provided as
second part of the string, separated with comma from the mode
```sql
turso> PRAGMA unstable_capture_data_changes_conn('rowid-only');
turso> PRAGMA unstable_capture_data_changes_conn('off');
turso> PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc_table');
turso> PRAGMA unstable_capture_data_changes_conn;
┌────────────┬──────────────────┐
│ mode │ table │
├────────────┼──────────────────┤
│ rowid-only │ custom_cdc_table │
└────────────┴──────────────────┘
```
2. CDC table schema right now is simple but it will be evolved soon to
support logging of row values before/after the change:
```sql
CREATE TABLE custom_cdc_table (
operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
operation_time INTEGER, -- unixepoch() at the moment of insert, can drift if machine clocks is not monotonic
operation_type INTEGER, -- -1 = delete, 0 = update, 1 = insert
table_name TEXT,
id
)
```
* Note, that `operation_id` is marked as `AUTOINCREMENT` but `turso-
db` needs to implement
https://github.com/tursodatabase/turso/issues/1976 in order to properly
support that keyword
3. Query planner changes are made in `INSERT`/`UPDATE`/`DELETE` plans in
order to emit updates to the CDC table for changes in the table
* Note, that row `UPDATE` which change primary key generate `DELETE` +
`INSERT` statement instead of single `UPDATE`
### Implementation details
- `PRAGMA` to enable CDC is **unstable** which means that publicly
visible side-effects/public API can change in future (and it will change
soon in order to support more rich CDC modes)
- CDC table is just a regular table with its benefits and downsides:
* benefits: user can perform maintenance operations with that table
just with regular SQL like `DELETE FROM turso_cdc WHERE operation_id <
?` to cleanup old not needed CDC entries
* downsides: user can accidentally make unwanted change to CDC table
- Changes to CDC table is not logged to itself
* Note, that different connections (e.g. `C1`, `C2`) can have
different CDC tables set (e.g. `A` and `B`) - in which case changes made
to CDC table `B` through connection `C1` will be reflected in CDC table
`A`
Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>
Closes #1926
This commit is contained in:
44
core/lib.rs
44
core/lib.rs
@@ -43,6 +43,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||
|
||||
use crate::storage::{header_accessor, wal::DummyWAL};
|
||||
use crate::translate::optimizer::optimize_plan;
|
||||
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
|
||||
use crate::util::{OpenMode, OpenOptions};
|
||||
use crate::vtab::VirtualTable;
|
||||
use core::str;
|
||||
@@ -278,6 +279,7 @@ impl Database {
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
readonly: Cell::new(false),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
|
||||
});
|
||||
if let Err(e) = conn.register_builtins() {
|
||||
return Err(LimboError::ExtensionError(e));
|
||||
@@ -330,6 +332,7 @@ impl Database {
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
readonly: Cell::new(false),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
capture_data_changes: RefCell::new(CaptureDataChangesMode::Off),
|
||||
});
|
||||
|
||||
if let Err(e) = conn.register_builtins() {
|
||||
@@ -434,6 +437,39 @@ fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum CaptureDataChangesMode {
|
||||
Off,
|
||||
RowidOnly { table: String },
|
||||
}
|
||||
|
||||
impl CaptureDataChangesMode {
|
||||
pub fn parse(value: &str) -> Result<CaptureDataChangesMode> {
|
||||
let (mode, table) = value
|
||||
.split_once(",")
|
||||
.unwrap_or((value, TURSO_CDC_DEFAULT_TABLE_NAME));
|
||||
match mode {
|
||||
"off" => Ok(CaptureDataChangesMode::Off),
|
||||
"rowid-only" => Ok(CaptureDataChangesMode::RowidOnly { table: table.to_string() }),
|
||||
_ => Err(LimboError::InvalidArgument(
|
||||
"unexpected pragma value: expected '<mode>' or '<mode>,<cdc-table-name>' parameter where mode is one of off|rowid-only".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
pub fn mode_name(&self) -> &str {
|
||||
match self {
|
||||
CaptureDataChangesMode::Off => "off",
|
||||
CaptureDataChangesMode::RowidOnly { .. } => "rowid-only",
|
||||
}
|
||||
}
|
||||
pub fn table(&self) -> Option<&str> {
|
||||
match self {
|
||||
CaptureDataChangesMode::Off => None,
|
||||
CaptureDataChangesMode::RowidOnly { table } => Some(table.as_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Connection {
|
||||
_db: Arc<Database>,
|
||||
pager: Rc<Pager>,
|
||||
@@ -450,6 +486,7 @@ pub struct Connection {
|
||||
cache_size: Cell<i32>,
|
||||
readonly: Cell<bool>,
|
||||
wal_checkpoint_disabled: Cell<bool>,
|
||||
capture_data_changes: RefCell<CaptureDataChangesMode>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
@@ -724,6 +761,13 @@ impl Connection {
|
||||
self.cache_size.set(size);
|
||||
}
|
||||
|
||||
pub fn get_capture_data_changes(&self) -> std::cell::Ref<'_, CaptureDataChangesMode> {
|
||||
self.capture_data_changes.borrow()
|
||||
}
|
||||
pub fn set_capture_data_changes(&self, opts: CaptureDataChangesMode) {
|
||||
self.capture_data_changes.replace(opts);
|
||||
}
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
pub fn open_new(&self, path: &str, vfs: &str) -> Result<(Arc<dyn IO>, Arc<Database>)> {
|
||||
Database::open_with_vfs(&self._db, path, vfs)
|
||||
|
||||
@@ -7,21 +7,21 @@ use turso_sqlite3_parser::ast::PragmaName;
|
||||
bitflags! {
|
||||
// Flag names match those used in SQLite:
|
||||
// https://github.com/sqlite/sqlite/blob/b3c1884b65400da85636458298bd77cbbfdfb401/tool/mkpragmatab.tcl#L22-L29
|
||||
struct PragmaFlags: u8 {
|
||||
const NeedSchema = 0x01;
|
||||
const NoColumns = 0x02;
|
||||
const NoColumns1 = 0x04;
|
||||
const ReadOnly = 0x08;
|
||||
const Result0 = 0x10;
|
||||
const Result1 = 0x20;
|
||||
const SchemaOpt = 0x40;
|
||||
const SchemaReq = 0x80;
|
||||
pub struct PragmaFlags: u8 {
|
||||
const NeedSchema = 0x01; /* Force schema load before running */
|
||||
const NoColumns = 0x02; /* OP_ResultRow called with zero columns */
|
||||
const NoColumns1 = 0x04; /* zero columns if RHS argument is present */
|
||||
const ReadOnly = 0x08; /* Read-only HEADER_VALUE */
|
||||
const Result0 = 0x10; /* Acts as query when no argument */
|
||||
const Result1 = 0x20; /* Acts as query when has one argument */
|
||||
const SchemaOpt = 0x40; /* Schema restricts name search if present */
|
||||
const SchemaReq = 0x80; /* Schema required - "main" is default */
|
||||
}
|
||||
}
|
||||
|
||||
struct Pragma {
|
||||
flags: PragmaFlags,
|
||||
columns: &'static [&'static str],
|
||||
pub struct Pragma {
|
||||
pub flags: PragmaFlags,
|
||||
pub columns: &'static [&'static str],
|
||||
}
|
||||
|
||||
impl Pragma {
|
||||
@@ -30,7 +30,7 @@ impl Pragma {
|
||||
}
|
||||
}
|
||||
|
||||
fn pragma_for(pragma: PragmaName) -> Pragma {
|
||||
pub fn pragma_for(pragma: PragmaName) -> Pragma {
|
||||
use PragmaName::*;
|
||||
|
||||
match pragma {
|
||||
@@ -77,6 +77,10 @@ fn pragma_for(pragma: PragmaName) -> Pragma {
|
||||
PragmaFlags::NeedSchema | PragmaFlags::ReadOnly | PragmaFlags::Result0,
|
||||
&["message"],
|
||||
),
|
||||
UnstableCaptureDataChangesConn => Pragma::new(
|
||||
PragmaFlags::NeedSchema | PragmaFlags::Result0 | PragmaFlags::SchemaReq,
|
||||
&["mode", "table"],
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use crate::vdbe::builder::{CursorKey, CursorType, ProgramBuilder};
|
||||
use crate::vdbe::insn::{CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral};
|
||||
use crate::vdbe::CursorID;
|
||||
use crate::vdbe::{insn::Insn, BranchOffset};
|
||||
use crate::{Result, SymbolTable};
|
||||
use crate::{bail_parse_error, Result, SymbolTable};
|
||||
|
||||
pub struct Resolver<'a> {
|
||||
pub schema: &'a Schema,
|
||||
@@ -149,6 +149,8 @@ pub struct TranslateCtx<'a> {
|
||||
/// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause.
|
||||
/// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`.
|
||||
pub non_aggregate_expressions: Vec<(&'a Expr, bool)>,
|
||||
/// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data)
|
||||
pub cdc_cursor_id: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a> TranslateCtx<'a> {
|
||||
@@ -175,6 +177,7 @@ impl<'a> TranslateCtx<'a> {
|
||||
result_columns_to_skip_in_orderby_sorter: None,
|
||||
resolver: Resolver::new(schema, syms),
|
||||
non_aggregate_expressions: Vec::new(),
|
||||
cdc_cursor_id: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -566,6 +569,22 @@ fn emit_delete_insns(
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id {
|
||||
let rowid_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id: main_table_cursor_id,
|
||||
dest: rowid_reg,
|
||||
});
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
OperationMode::DELETE,
|
||||
turso_cdc_cursor_id,
|
||||
rowid_reg,
|
||||
table_reference.table.get_name(),
|
||||
)?;
|
||||
}
|
||||
|
||||
program.emit_insn(Insn::Delete {
|
||||
cursor_id: main_table_cursor_id,
|
||||
});
|
||||
@@ -1076,6 +1095,53 @@ fn emit_update_insns(
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
|
||||
let rowid_reg = program.alloc_register();
|
||||
if has_user_provided_rowid {
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id,
|
||||
dest: rowid_reg,
|
||||
});
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
OperationMode::DELETE,
|
||||
cdc_cursor_id,
|
||||
rowid_reg,
|
||||
table_ref.table.get_name(),
|
||||
)?;
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: rowid_set_clause_reg.expect(
|
||||
"rowid_set_clause_reg must be set because has_user_provided_rowid is true",
|
||||
),
|
||||
dst_reg: rowid_reg,
|
||||
amount: 1,
|
||||
});
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
OperationMode::INSERT,
|
||||
cdc_cursor_id,
|
||||
rowid_reg,
|
||||
table_ref.table.get_name(),
|
||||
)?;
|
||||
} else {
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: rowid_set_clause_reg.unwrap_or(beg),
|
||||
dst_reg: rowid_reg,
|
||||
amount: 1,
|
||||
});
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
OperationMode::UPDATE,
|
||||
cdc_cursor_id,
|
||||
rowid_reg,
|
||||
table_ref.table.get_name(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// If we are updating the rowid, we cannot rely on overwrite on the
|
||||
// Insert instruction to update the cell. We need to first delete the current cell
|
||||
// and later insert the updated record
|
||||
@@ -1115,6 +1181,79 @@ fn emit_update_insns(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn emit_cdc_insns(
|
||||
program: &mut ProgramBuilder,
|
||||
resolver: &Resolver,
|
||||
operation_mode: OperationMode,
|
||||
cdc_cursor_id: usize,
|
||||
rowid_reg: usize,
|
||||
table_name: &str,
|
||||
) -> Result<()> {
|
||||
// (operation_id INTEGER PRIMARY KEY AUTOINCREMENT, operation_time INTEGER, operation_type INTEGER, table_name TEXT, id)
|
||||
let turso_cdc_registers = program.alloc_registers(5);
|
||||
program.emit_insn(Insn::Null {
|
||||
dest: turso_cdc_registers,
|
||||
dest_end: None,
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
|
||||
let Some(unixepoch_fn) = resolver.resolve_function("unixepoch", 0) else {
|
||||
bail_parse_error!("no function {}", "unixepoch");
|
||||
};
|
||||
let unixepoch_fn_ctx = crate::function::FuncCtx {
|
||||
func: unixepoch_fn,
|
||||
arg_count: 0,
|
||||
};
|
||||
|
||||
program.emit_insn(Insn::Function {
|
||||
constant_mask: 0,
|
||||
start_reg: 0,
|
||||
dest: turso_cdc_registers + 1,
|
||||
func: unixepoch_fn_ctx,
|
||||
});
|
||||
|
||||
let operation_type = match operation_mode {
|
||||
OperationMode::INSERT => 1,
|
||||
OperationMode::UPDATE | OperationMode::SELECT => 0,
|
||||
OperationMode::DELETE => -1,
|
||||
};
|
||||
program.emit_int(operation_type, turso_cdc_registers + 2);
|
||||
program.mark_last_insn_constant();
|
||||
|
||||
program.emit_string8(table_name.to_string(), turso_cdc_registers + 3);
|
||||
program.mark_last_insn_constant();
|
||||
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: rowid_reg,
|
||||
dst_reg: turso_cdc_registers + 4,
|
||||
amount: 0,
|
||||
});
|
||||
|
||||
let rowid_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::NewRowid {
|
||||
cursor: cdc_cursor_id,
|
||||
rowid_reg,
|
||||
prev_largest_reg: 0, // todo(sivukhin): properly set value here from sqlite_sequence table when AUTOINCREMENT will be properly implemented in Turso
|
||||
});
|
||||
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: turso_cdc_registers,
|
||||
count: 5,
|
||||
dest_reg: record_reg,
|
||||
index_name: None,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Insert {
|
||||
cursor: cdc_cursor_id,
|
||||
key_reg: rowid_reg,
|
||||
record_reg,
|
||||
flag: InsertFlags::new(),
|
||||
table_name: "".to_string(),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize the limit/offset counters and registers.
|
||||
/// In case of compound SELECTs, the limit counter is initialized only once,
|
||||
/// hence [LimitCtx::initialize_counter] being false in those cases.
|
||||
|
||||
@@ -6,6 +6,7 @@ use turso_sqlite3_parser::ast::{
|
||||
|
||||
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
|
||||
use crate::schema::{IndexColumn, Table};
|
||||
use crate::translate::emitter::{emit_cdc_insns, OperationMode};
|
||||
use crate::util::normalize_ident;
|
||||
use crate::vdbe::builder::ProgramBuilderOpts;
|
||||
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
|
||||
@@ -116,6 +117,26 @@ pub fn translate_insert(
|
||||
let halt_label = program.allocate_label();
|
||||
let loop_start_label = program.allocate_label();
|
||||
|
||||
let cdc_table = program.capture_data_changes_mode().table();
|
||||
let cdc_table = if let Some(cdc_table) = cdc_table {
|
||||
if table.get_name() != cdc_table {
|
||||
let Some(turso_cdc_table) = schema.get_table(cdc_table) else {
|
||||
crate::bail_parse_error!("no such table: {}", cdc_table);
|
||||
};
|
||||
let Some(cdc_btree) = turso_cdc_table.btree().clone() else {
|
||||
crate::bail_parse_error!("no such table: {}", cdc_table);
|
||||
};
|
||||
Some((
|
||||
program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())),
|
||||
cdc_btree,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut yield_reg_opt = None;
|
||||
let mut temp_table_ctx = None;
|
||||
let (num_values, cursor_id) = match body {
|
||||
@@ -328,6 +349,15 @@ pub fn translate_insert(
|
||||
&resolver,
|
||||
)?;
|
||||
}
|
||||
// Open turso_cdc table btree for writing if necessary
|
||||
if let Some((cdc_cursor_id, cdc_btree)) = &cdc_table {
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: *cdc_cursor_id,
|
||||
root_page: cdc_btree.root_page.into(),
|
||||
name: cdc_btree.name.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// Open all the index btrees for writing
|
||||
for idx_cursor in idx_cursors.iter() {
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
@@ -414,6 +444,18 @@ pub fn translate_insert(
|
||||
_ => (),
|
||||
}
|
||||
|
||||
// Write record to the turso_cdc table if necessary
|
||||
if let Some((cdc_cursor_id, _)) = &cdc_table {
|
||||
emit_cdc_insns(
|
||||
&mut program,
|
||||
&resolver,
|
||||
OperationMode::INSERT,
|
||||
*cdc_cursor_id,
|
||||
rowid_reg,
|
||||
&table_name.0,
|
||||
)?;
|
||||
}
|
||||
|
||||
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
|
||||
for index_col_mapping in index_col_mappings {
|
||||
// find which cursor we opened earlier for this index
|
||||
|
||||
@@ -117,6 +117,33 @@ pub fn init_loop(
|
||||
t_ctx.meta_left_joins.len() == tables.joined_tables().len(),
|
||||
"meta_left_joins length does not match tables length"
|
||||
);
|
||||
|
||||
let cdc_table = program.capture_data_changes_mode().table();
|
||||
if cdc_table.is_some()
|
||||
&& matches!(
|
||||
mode,
|
||||
OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE
|
||||
)
|
||||
{
|
||||
assert!(tables.joined_tables().len() == 1);
|
||||
let cdc_table_name = cdc_table.unwrap();
|
||||
if tables.joined_tables()[0].table.get_name() != cdc_table_name {
|
||||
let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else {
|
||||
crate::bail_parse_error!("no such table: {}", cdc_table_name);
|
||||
};
|
||||
let Some(cdc_btree) = cdc_table.btree().clone() else {
|
||||
crate::bail_parse_error!("no such table: {}", cdc_table_name);
|
||||
};
|
||||
let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: cdc_cursor_id,
|
||||
root_page: cdc_btree.root_page.into(),
|
||||
name: cdc_btree.name.clone(),
|
||||
});
|
||||
t_ctx.cdc_cursor_id = Some(cdc_cursor_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize ephemeral indexes for distinct aggregates
|
||||
for (i, agg) in aggregates
|
||||
.iter_mut()
|
||||
|
||||
@@ -75,6 +75,7 @@ pub fn translate(
|
||||
|
||||
let mut program = ProgramBuilder::new(
|
||||
query_mode,
|
||||
connection.get_capture_data_changes().clone(),
|
||||
// These options will be extended whithin each translate program
|
||||
ProgramBuilderOpts {
|
||||
num_cursors: 1,
|
||||
|
||||
@@ -3,17 +3,19 @@
|
||||
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use turso_sqlite3_parser::ast::PragmaName;
|
||||
use turso_sqlite3_parser::ast::{self, Expr};
|
||||
use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr};
|
||||
use turso_sqlite3_parser::ast::{PragmaName, QualifiedName};
|
||||
|
||||
use crate::pragma::pragma_for;
|
||||
use crate::schema::Schema;
|
||||
use crate::storage::pager::AutoVacuumMode;
|
||||
use crate::storage::sqlite3_ondisk::MIN_PAGE_CACHE_SIZE;
|
||||
use crate::storage::wal::CheckpointMode;
|
||||
use crate::util::{normalize_ident, parse_signed_number};
|
||||
use crate::translate::schema::translate_create_table;
|
||||
use crate::util::{normalize_ident, parse_signed_number, parse_string};
|
||||
use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts};
|
||||
use crate::vdbe::insn::{Cookie, Insn};
|
||||
use crate::{bail_parse_error, storage, LimboError, Value};
|
||||
use crate::{bail_parse_error, storage, CaptureDataChangesMode, LimboError, Value};
|
||||
use std::str::FromStr;
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
@@ -57,17 +59,15 @@ pub fn translate_pragma(
|
||||
Err(_) => bail_parse_error!("Not a valid pragma name"),
|
||||
};
|
||||
|
||||
match body {
|
||||
None => {
|
||||
query_pragma(pragma, schema, None, pager, connection, &mut program)?;
|
||||
}
|
||||
let mut program = match body {
|
||||
None => query_pragma(pragma, schema, None, pager, connection, program)?,
|
||||
Some(ast::PragmaBody::Equals(value) | ast::PragmaBody::Call(value)) => match pragma {
|
||||
PragmaName::TableInfo => {
|
||||
query_pragma(pragma, schema, Some(value), pager, connection, &mut program)?;
|
||||
query_pragma(pragma, schema, Some(value), pager, connection, program)?
|
||||
}
|
||||
_ => {
|
||||
write = true;
|
||||
update_pragma(pragma, schema, value, pager, connection, &mut program)?;
|
||||
update_pragma(pragma, schema, value, pager, connection, program)?
|
||||
}
|
||||
},
|
||||
};
|
||||
@@ -85,8 +85,8 @@ fn update_pragma(
|
||||
value: ast::Expr,
|
||||
pager: Rc<Pager>,
|
||||
connection: Arc<crate::Connection>,
|
||||
program: &mut ProgramBuilder,
|
||||
) -> crate::Result<()> {
|
||||
mut program: ProgramBuilder,
|
||||
) -> crate::Result<ProgramBuilder> {
|
||||
match pragma {
|
||||
PragmaName::CacheSize => {
|
||||
let cache_size = match parse_signed_number(&value)? {
|
||||
@@ -95,42 +95,33 @@ fn update_pragma(
|
||||
_ => bail_parse_error!("Invalid value for cache size pragma"),
|
||||
};
|
||||
update_cache_size(cache_size, pager, connection)?;
|
||||
Ok(())
|
||||
}
|
||||
PragmaName::JournalMode => {
|
||||
query_pragma(
|
||||
PragmaName::JournalMode,
|
||||
schema,
|
||||
None,
|
||||
pager,
|
||||
connection,
|
||||
program,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
PragmaName::LegacyFileFormat => Ok(()),
|
||||
PragmaName::WalCheckpoint => {
|
||||
query_pragma(
|
||||
PragmaName::WalCheckpoint,
|
||||
schema,
|
||||
Some(value),
|
||||
pager,
|
||||
connection,
|
||||
program,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
PragmaName::PageCount => {
|
||||
query_pragma(
|
||||
PragmaName::PageCount,
|
||||
schema,
|
||||
None,
|
||||
pager,
|
||||
connection,
|
||||
program,
|
||||
)?;
|
||||
Ok(())
|
||||
Ok(program)
|
||||
}
|
||||
PragmaName::JournalMode => query_pragma(
|
||||
PragmaName::JournalMode,
|
||||
schema,
|
||||
None,
|
||||
pager,
|
||||
connection,
|
||||
program,
|
||||
),
|
||||
PragmaName::LegacyFileFormat => Ok(program),
|
||||
PragmaName::WalCheckpoint => query_pragma(
|
||||
PragmaName::WalCheckpoint,
|
||||
schema,
|
||||
Some(value),
|
||||
pager,
|
||||
connection,
|
||||
program,
|
||||
),
|
||||
PragmaName::PageCount => query_pragma(
|
||||
PragmaName::PageCount,
|
||||
schema,
|
||||
None,
|
||||
pager,
|
||||
connection,
|
||||
program,
|
||||
),
|
||||
PragmaName::UserVersion => {
|
||||
let data = parse_signed_number(&value)?;
|
||||
let version_value = match data {
|
||||
@@ -145,7 +136,7 @@ fn update_pragma(
|
||||
value: version_value,
|
||||
p5: 1,
|
||||
});
|
||||
Ok(())
|
||||
Ok(program)
|
||||
}
|
||||
PragmaName::SchemaVersion => {
|
||||
// TODO: Implement updating schema_version
|
||||
@@ -214,9 +205,33 @@ fn update_pragma(
|
||||
value: auto_vacuum_mode - 1,
|
||||
p5: 0,
|
||||
});
|
||||
Ok(())
|
||||
Ok(program)
|
||||
}
|
||||
PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"),
|
||||
PragmaName::UnstableCaptureDataChangesConn => {
|
||||
let value = parse_string(&value)?;
|
||||
// todo(sivukhin): ideally, we should consistently update capture_data_changes connection flag only after successfull execution of schema change statement
|
||||
// but for now, let's keep it as is...
|
||||
let opts = CaptureDataChangesMode::parse(&value)?;
|
||||
if let Some(table) = &opts.table() {
|
||||
// make sure that we have table created
|
||||
program = translate_create_table(
|
||||
QualifiedName::single(ast::Name(table.to_string())),
|
||||
false,
|
||||
ast::CreateTableBody::columns_and_constraints_from_definition(
|
||||
turso_cdc_table_columns(),
|
||||
None,
|
||||
ast::TableOptions::NONE,
|
||||
)
|
||||
.unwrap(),
|
||||
true,
|
||||
schema,
|
||||
program,
|
||||
)?;
|
||||
}
|
||||
connection.set_capture_data_changes(opts);
|
||||
Ok(program)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,8 +241,8 @@ fn query_pragma(
|
||||
value: Option<ast::Expr>,
|
||||
pager: Rc<Pager>,
|
||||
connection: Arc<crate::Connection>,
|
||||
program: &mut ProgramBuilder,
|
||||
) -> crate::Result<()> {
|
||||
mut program: ProgramBuilder,
|
||||
) -> crate::Result<ProgramBuilder> {
|
||||
let register = program.alloc_register();
|
||||
match pragma {
|
||||
PragmaName::CacheSize => {
|
||||
@@ -365,11 +380,25 @@ fn query_pragma(
|
||||
program.emit_result_row(register, 1);
|
||||
}
|
||||
PragmaName::IntegrityCheck => {
|
||||
translate_integrity_check(schema, program)?;
|
||||
translate_integrity_check(schema, &mut program)?;
|
||||
}
|
||||
PragmaName::UnstableCaptureDataChangesConn => {
|
||||
let pragma = pragma_for(pragma);
|
||||
let second_column = program.alloc_register();
|
||||
let opts = connection.get_capture_data_changes();
|
||||
program.emit_string8(opts.mode_name().to_string(), register);
|
||||
if let Some(table) = &opts.table() {
|
||||
program.emit_string8(table.to_string(), second_column);
|
||||
} else {
|
||||
program.emit_null(second_column, None);
|
||||
}
|
||||
program.emit_result_row(register, 2);
|
||||
program.add_pragma_result_column(pragma.columns[0].to_string());
|
||||
program.add_pragma_result_column(pragma.columns[1].to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(program)
|
||||
}
|
||||
|
||||
fn update_auto_vacuum_mode(
|
||||
@@ -435,3 +464,53 @@ fn update_cache_size(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub const TURSO_CDC_DEFAULT_TABLE_NAME: &str = "turso_cdc";
|
||||
fn turso_cdc_table_columns() -> Vec<ColumnDefinition> {
|
||||
vec![
|
||||
ast::ColumnDefinition {
|
||||
col_name: ast::Name("operation_id".to_string()),
|
||||
col_type: Some(ast::Type {
|
||||
name: "INTEGER".to_string(),
|
||||
size: None,
|
||||
}),
|
||||
constraints: vec![ast::NamedColumnConstraint {
|
||||
name: None,
|
||||
constraint: ast::ColumnConstraint::PrimaryKey {
|
||||
order: None,
|
||||
conflict_clause: None,
|
||||
auto_increment: true,
|
||||
},
|
||||
}],
|
||||
},
|
||||
ast::ColumnDefinition {
|
||||
col_name: ast::Name("operation_time".to_string()),
|
||||
col_type: Some(ast::Type {
|
||||
name: "INTEGER".to_string(),
|
||||
size: None,
|
||||
}),
|
||||
constraints: vec![],
|
||||
},
|
||||
ast::ColumnDefinition {
|
||||
col_name: ast::Name("operation_type".to_string()),
|
||||
col_type: Some(ast::Type {
|
||||
name: "INTEGER".to_string(),
|
||||
size: None,
|
||||
}),
|
||||
constraints: vec![],
|
||||
},
|
||||
ast::ColumnDefinition {
|
||||
col_name: ast::Name("table_name".to_string()),
|
||||
col_type: Some(ast::Type {
|
||||
name: "TEXT".to_string(),
|
||||
size: None,
|
||||
}),
|
||||
constraints: vec![],
|
||||
},
|
||||
ast::ColumnDefinition {
|
||||
col_name: ast::Name("id".to_string()),
|
||||
col_type: None,
|
||||
constraints: vec![],
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ pub fn emit_subquery(
|
||||
reg_limit_offset_sum: None,
|
||||
resolver: Resolver::new(t_ctx.resolver.schema, t_ctx.resolver.symbol_table),
|
||||
non_aggregate_expressions: Vec::new(),
|
||||
cdc_cursor_id: None,
|
||||
};
|
||||
let subquery_body_end_label = program.allocate_label();
|
||||
program.emit_insn(Insn::InitCoroutine {
|
||||
|
||||
54
core/util.rs
54
core/util.rs
@@ -1044,6 +1044,41 @@ pub fn parse_signed_number(expr: &Expr) -> Result<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_string(expr: &Expr) -> Result<String> {
|
||||
match expr {
|
||||
Expr::Name(ast::Name(s)) if s.len() >= 2 && s.starts_with("'") && s.ends_with("'") => {
|
||||
Ok(s[1..s.len() - 1].to_string())
|
||||
}
|
||||
_ => Err(LimboError::InvalidArgument(format!(
|
||||
"string parameter expected, got {:?} instead",
|
||||
expr
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn parse_pragma_bool(expr: &Expr) -> Result<bool> {
|
||||
const TRUE_VALUES: &[&str] = &["yes", "true", "on"];
|
||||
const FALSE_VALUES: &[&str] = &["no", "false", "off"];
|
||||
if let Ok(number) = parse_signed_number(expr) {
|
||||
if let Value::Integer(x @ (0 | 1)) = number {
|
||||
return Ok(x != 0);
|
||||
}
|
||||
} else if let Expr::Name(name) = expr {
|
||||
let ident = normalize_ident(&name.0);
|
||||
if TRUE_VALUES.contains(&ident.as_str()) {
|
||||
return Ok(true);
|
||||
}
|
||||
if FALSE_VALUES.contains(&ident.as_str()) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Err(LimboError::InvalidArgument(
|
||||
"boolean pragma value must be either 0|1 integer or yes|true|on|no|false|off token"
|
||||
.to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
// for TVF's we need these at planning time so we cannot emit translate_expr
|
||||
pub fn vtable_args(args: &[ast::Expr]) -> Vec<turso_ext::Value> {
|
||||
let mut vtable_args = Vec::new();
|
||||
@@ -1076,7 +1111,7 @@ pub fn vtable_args(args: &[ast::Expr]) -> Vec<turso_ext::Value> {
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
use turso_sqlite3_parser::ast::{self, Expr, Id, Literal, Operator::*, Type};
|
||||
use turso_sqlite3_parser::ast::{self, Expr, Id, Literal, Name, Operator::*, Type};
|
||||
|
||||
#[test]
|
||||
fn test_normalize_ident() {
|
||||
@@ -2031,4 +2066,21 @@ pub mod tests {
|
||||
Value::Float(-9.223_372_036_854_776e18)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_pragma_bool() {
|
||||
assert!(parse_pragma_bool(&Expr::Literal(Literal::Numeric("1".into()))).unwrap(),);
|
||||
assert!(parse_pragma_bool(&Expr::Name(Name("true".into()))).unwrap(),);
|
||||
assert!(parse_pragma_bool(&Expr::Name(Name("on".into()))).unwrap(),);
|
||||
assert!(parse_pragma_bool(&Expr::Name(Name("yes".into()))).unwrap(),);
|
||||
|
||||
assert!(!parse_pragma_bool(&Expr::Literal(Literal::Numeric("0".into()))).unwrap(),);
|
||||
assert!(!parse_pragma_bool(&Expr::Name(Name("false".into()))).unwrap(),);
|
||||
assert!(!parse_pragma_bool(&Expr::Name(Name("off".into()))).unwrap(),);
|
||||
assert!(!parse_pragma_bool(&Expr::Name(Name("no".into()))).unwrap(),);
|
||||
|
||||
assert!(parse_pragma_bool(&Expr::Name(Name("nono".into()))).is_err());
|
||||
assert!(parse_pragma_bool(&Expr::Name(Name("10".into()))).is_err());
|
||||
assert!(parse_pragma_bool(&Expr::Name(Name("-1".into()))).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::{
|
||||
emitter::TransactionMode,
|
||||
plan::{ResultSetColumn, TableReferences},
|
||||
},
|
||||
Connection, Value, VirtualTable,
|
||||
CaptureDataChangesMode, Connection, Value, VirtualTable,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -110,6 +110,7 @@ pub struct ProgramBuilder {
|
||||
nested_level: usize,
|
||||
init_label: BranchOffset,
|
||||
start_offset: BranchOffset,
|
||||
capture_data_changes_mode: CaptureDataChangesMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -149,7 +150,11 @@ pub struct ProgramBuilderOpts {
|
||||
}
|
||||
|
||||
impl ProgramBuilder {
|
||||
pub fn new(query_mode: QueryMode, opts: ProgramBuilderOpts) -> Self {
|
||||
pub fn new(
|
||||
query_mode: QueryMode,
|
||||
capture_data_changes_mode: CaptureDataChangesMode,
|
||||
opts: ProgramBuilderOpts,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_reference_counter: TableRefIdCounter::new(),
|
||||
next_free_register: 1,
|
||||
@@ -172,9 +177,14 @@ impl ProgramBuilder {
|
||||
// These labels will be filled when `prologue()` is called
|
||||
init_label: BranchOffset::Placeholder,
|
||||
start_offset: BranchOffset::Placeholder,
|
||||
capture_data_changes_mode,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn capture_data_changes_mode(&self) -> &CaptureDataChangesMode {
|
||||
&self.capture_data_changes_mode
|
||||
}
|
||||
|
||||
pub fn extend(&mut self, opts: &ProgramBuilderOpts) {
|
||||
self.insns.reserve(opts.approx_num_insns);
|
||||
self.cursor_ref.reserve(opts.num_cursors);
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
mod test_cdc;
|
||||
mod test_function_rowid;
|
||||
|
||||
557
tests/integration/functions/test_cdc.rs
Normal file
557
tests/integration/functions/test_cdc.rs
Normal file
@@ -0,0 +1,557 @@
|
||||
use rusqlite::types::Value;
|
||||
|
||||
use crate::common::{limbo_exec_rows, TempDatabase};
|
||||
|
||||
fn replace_column_with_null(rows: Vec<Vec<Value>>, column: usize) -> Vec<Vec<Value>> {
|
||||
rows.into_iter()
|
||||
.map(|row| {
|
||||
row.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, value)| if i == column { Value::Null } else { value })
|
||||
.collect()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_simple() {
|
||||
let db = TempDatabase::new_empty(false);
|
||||
let conn = db.connect_limbo();
|
||||
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
|
||||
.unwrap();
|
||||
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn.execute("INSERT INTO t VALUES (10, 10), (5, 1)")
|
||||
.unwrap();
|
||||
let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(5), Value::Integer(1)],
|
||||
vec![Value::Integer(10), Value::Integer(10)],
|
||||
]
|
||||
);
|
||||
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(10)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(5)
|
||||
]
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_crud() {
|
||||
let db = TempDatabase::new_empty(false);
|
||||
let conn = db.connect_limbo();
|
||||
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
|
||||
.unwrap();
|
||||
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn.execute("INSERT INTO t VALUES (20, 20), (10, 10), (5, 1)")
|
||||
.unwrap();
|
||||
conn.execute("UPDATE t SET y = 100 WHERE x = 5").unwrap();
|
||||
conn.execute("DELETE FROM t WHERE x > 5").unwrap();
|
||||
conn.execute("INSERT INTO t VALUES (1, 1)").unwrap();
|
||||
conn.execute("UPDATE t SET x = 2 WHERE x = 1").unwrap();
|
||||
|
||||
let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(2), Value::Integer(1)],
|
||||
vec![Value::Integer(5), Value::Integer(100)],
|
||||
]
|
||||
);
|
||||
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(20)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(10)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(3),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(5)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(4),
|
||||
Value::Null,
|
||||
Value::Integer(0),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(5)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(5),
|
||||
Value::Null,
|
||||
Value::Integer(-1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(10)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(6),
|
||||
Value::Null,
|
||||
Value::Integer(-1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(20)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(7),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(8),
|
||||
Value::Null,
|
||||
Value::Integer(-1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(9),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_failed_op() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn = db.connect_limbo();
|
||||
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
|
||||
.unwrap();
|
||||
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn.execute("INSERT INTO t VALUES (1, 10), (2, 20)")
|
||||
.unwrap();
|
||||
assert!(conn
|
||||
.execute("INSERT INTO t VALUES (3, 30), (4, 40), (5, 10)")
|
||||
.is_err());
|
||||
conn.execute("INSERT INTO t VALUES (6, 60), (7, 70)")
|
||||
.unwrap();
|
||||
|
||||
let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(2), Value::Integer(20)],
|
||||
vec![Value::Integer(6), Value::Integer(60)],
|
||||
vec![Value::Integer(7), Value::Integer(70)],
|
||||
]
|
||||
);
|
||||
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(3),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(6)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(4),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(7)
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_uncaptured_connection() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn1 = db.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured
|
||||
let conn2 = db.connect_limbo();
|
||||
conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap();
|
||||
conn2
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
|
||||
.unwrap();
|
||||
conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured
|
||||
conn2
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('off')")
|
||||
.unwrap();
|
||||
conn2.execute("INSERT INTO t VALUES (5, 50)").unwrap();
|
||||
|
||||
conn1.execute("INSERT INTO t VALUES (6, 60)").unwrap(); // captured
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('off')")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (7, 70)").unwrap();
|
||||
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(2), Value::Integer(20)],
|
||||
vec![Value::Integer(3), Value::Integer(30)],
|
||||
vec![Value::Integer(4), Value::Integer(40)],
|
||||
vec![Value::Integer(5), Value::Integer(50)],
|
||||
vec![Value::Integer(6), Value::Integer(60)],
|
||||
vec![Value::Integer(7), Value::Integer(70)],
|
||||
]
|
||||
);
|
||||
let rows = replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM turso_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(4)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(3),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(6)
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_custom_table() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn1 = db.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(2), Value::Integer(20)],
|
||||
]
|
||||
);
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_ignore_changes_in_cdc_table() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn1 = db.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(2), Value::Integer(20)],
|
||||
]
|
||||
);
|
||||
conn1
|
||||
.execute("DELETE FROM custom_cdc WHERE operation_id < 2")
|
||||
.unwrap();
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
],]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_transaction() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn1 = db.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
|
||||
.unwrap();
|
||||
conn1.execute("BEGIN").unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
|
||||
conn1.execute("INSERT INTO q VALUES (2, 20)").unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (3, 30)").unwrap();
|
||||
conn1.execute("DELETE FROM t WHERE x = 1").unwrap();
|
||||
conn1.execute("UPDATE q SET y = 200 WHERE x = 2").unwrap();
|
||||
conn1.execute("COMMIT").unwrap();
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t");
|
||||
assert_eq!(rows, vec![vec![Value::Integer(3), Value::Integer(30)],]);
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM q");
|
||||
assert_eq!(rows, vec![vec![Value::Integer(2), Value::Integer(200)],]);
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("q".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(3),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(3)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(4),
|
||||
Value::Null,
|
||||
Value::Integer(-1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(5),
|
||||
Value::Null,
|
||||
Value::Integer(0),
|
||||
Value::Text("q".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_independent_connections() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn1 = db.connect_limbo();
|
||||
let conn2 = db.connect_limbo();
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
|
||||
conn2.execute("INSERT INTO t VALUES (2, 20)").unwrap();
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(2), Value::Integer(20)]
|
||||
]
|
||||
);
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(1)
|
||||
]]
|
||||
);
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc2"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![vec![
|
||||
Value::Integer(1),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
]]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cdc_independent_connections_different_cdc_not_ignore() {
|
||||
let db = TempDatabase::new_empty(true);
|
||||
let conn1 = db.connect_limbo();
|
||||
let conn2 = db.connect_limbo();
|
||||
conn1
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')")
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
|
||||
.unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
|
||||
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
|
||||
conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap();
|
||||
conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap();
|
||||
conn1
|
||||
.execute("DELETE FROM custom_cdc2 WHERE operation_id < 2")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("DELETE FROM custom_cdc1 WHERE operation_id < 2")
|
||||
.unwrap();
|
||||
let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t");
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![Value::Integer(1), Value::Integer(10)],
|
||||
vec![Value::Integer(2), Value::Integer(20)],
|
||||
vec![Value::Integer(3), Value::Integer(30)],
|
||||
vec![Value::Integer(4), Value::Integer(40)],
|
||||
]
|
||||
);
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(2)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(3),
|
||||
Value::Null,
|
||||
Value::Integer(-1),
|
||||
Value::Text("custom_cdc2".to_string()),
|
||||
Value::Integer(1)
|
||||
]
|
||||
]
|
||||
);
|
||||
let rows =
|
||||
replace_column_with_null(limbo_exec_rows(&db, &conn2, "SELECT * FROM custom_cdc2"), 1);
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
vec![
|
||||
Value::Integer(2),
|
||||
Value::Null,
|
||||
Value::Integer(1),
|
||||
Value::Text("t".to_string()),
|
||||
Value::Integer(4)
|
||||
],
|
||||
vec![
|
||||
Value::Integer(3),
|
||||
Value::Null,
|
||||
Value::Integer(-1),
|
||||
Value::Text("custom_cdc1".to_string()),
|
||||
Value::Integer(1)
|
||||
]
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -1362,6 +1362,23 @@ impl CreateTableBody {
|
||||
options,
|
||||
})
|
||||
}
|
||||
|
||||
/// Constructor from Vec of column definition
|
||||
pub fn columns_and_constraints_from_definition(
|
||||
columns_vec: Vec<ColumnDefinition>,
|
||||
constraints: Option<Vec<NamedTableConstraint>>,
|
||||
options: TableOptions,
|
||||
) -> Result<Self, ParserError> {
|
||||
let mut columns = IndexMap::new();
|
||||
for def in columns_vec {
|
||||
columns.insert(def.col_name.clone(), def);
|
||||
}
|
||||
Ok(Self::ColumnsAndConstraints {
|
||||
columns,
|
||||
constraints,
|
||||
options,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Table column definition
|
||||
@@ -1744,6 +1761,8 @@ pub enum PragmaName {
|
||||
SchemaVersion,
|
||||
/// returns information about the columns of a table
|
||||
TableInfo,
|
||||
/// enable capture-changes logic for the connection
|
||||
UnstableCaptureDataChangesConn,
|
||||
/// Returns the user version of the database file.
|
||||
UserVersion,
|
||||
/// trigger a checkpoint to run on database(s) if WAL is enabled
|
||||
|
||||
Reference in New Issue
Block a user