mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-17 07:04:20 +01:00
Merge 'extensions/vtabs: implement remaining opcodes' from Preston Thorpe
The only real benefit right now here is the ability to rename virtual
tables.
Then this now properly calls `VBegin` at the start of a vtab write
transaction, despite none of our extensions needing or implementing
transactions at this point.
```console
explain insert into t values ('key','value');
addr opcode p1 p2 p3 p4 p5 comment
---- ----------------- ---- ---- ---- ------------- -- -------
0 Init 0 10 0 0 Start at 10
1 VOpen 0 0 0 0 t
2 VBegin 0 0 0 0
3 Null 0 1 0 0 r[1]=NULL
4 Null 0 3 0 0 r[3]=NULL
5 String8 0 4 0 key 0 r[4]='key'
6 String8 0 5 0 value 0 r[5]='value'
7 VUpdate 0 5 1 0 args=r[1..5]
8 Close 0 0 0 0
9 Halt 0 0 0 0
10 Transaction 0 2 1 0 iDb=0 tx_mode=Write
11 Goto 0 1 0 0
Exiting Turso SQL Shell.
```
Closes #3930
This commit is contained in:
@@ -560,14 +560,14 @@ Modifiers:
|
||||
| TableLock | No | |
|
||||
| Trace | No | |
|
||||
| Transaction | Yes | |
|
||||
| VBegin | No | |
|
||||
| VBegin | Yes | |
|
||||
| VColumn | Yes | |
|
||||
| VCreate | Yes | |
|
||||
| VDestroy | Yes | |
|
||||
| VFilter | Yes | |
|
||||
| VNext | Yes | |
|
||||
| VOpen | Yes | |
|
||||
| VRename | No | |
|
||||
| VRename | Yes | |
|
||||
| VUpdate | Yes | |
|
||||
| Vacuum | No | |
|
||||
| Variable | Yes | |
|
||||
|
||||
@@ -69,6 +69,7 @@ pub use io::{
|
||||
use parking_lot::RwLock;
|
||||
use rustc_hash::FxHashMap;
|
||||
use schema::Schema;
|
||||
use std::collections::HashSet;
|
||||
use std::task::Waker;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
@@ -649,6 +650,7 @@ impl Database {
|
||||
is_mvcc_bootstrap_connection: AtomicBool::new(is_mvcc_bootstrap_connection),
|
||||
fk_pragma: AtomicBool::new(false),
|
||||
fk_deferred_violations: AtomicIsize::new(0),
|
||||
vtab_txn_states: RwLock::new(HashSet::new()),
|
||||
});
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
@@ -1177,6 +1179,8 @@ pub struct Connection {
|
||||
/// Whether pragma foreign_keys=ON for this connection
|
||||
fk_pragma: AtomicBool,
|
||||
fk_deferred_violations: AtomicIsize,
|
||||
/// Track when each virtual table instance is currently in transaction.
|
||||
vtab_txn_states: RwLock<HashSet<u64>>,
|
||||
}
|
||||
|
||||
// SAFETY: This needs to be audited for thread safety.
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::{
|
||||
builder::{CursorType, ProgramBuilder},
|
||||
insn::{Cookie, Insn, RegisterOrLiteral},
|
||||
},
|
||||
vtab::VirtualTable,
|
||||
LimboError, Result,
|
||||
};
|
||||
|
||||
@@ -62,15 +63,26 @@ pub fn translate_alter_table(
|
||||
);
|
||||
}
|
||||
|
||||
let Some(original_btree) = resolver
|
||||
.schema
|
||||
.get_table(table_name)
|
||||
.and_then(|table| table.btree())
|
||||
else {
|
||||
let Some(table) = resolver.schema.get_table(table_name) else {
|
||||
return Err(LimboError::ParseError(format!(
|
||||
"no such table: {table_name}"
|
||||
)));
|
||||
};
|
||||
if let Some(tbl) = table.virtual_table() {
|
||||
if let ast::AlterTableBody::RenameTo(new_name) = &alter_table {
|
||||
let new_name_norm = normalize_ident(new_name.as_str());
|
||||
return translate_rename_virtual_table(
|
||||
program,
|
||||
tbl,
|
||||
table_name,
|
||||
new_name_norm,
|
||||
resolver,
|
||||
);
|
||||
}
|
||||
}
|
||||
let Some(original_btree) = table.btree() else {
|
||||
crate::bail_parse_error!("ALTER TABLE is only supported for BTree tables");
|
||||
};
|
||||
|
||||
// Check if this table has dependent materialized views
|
||||
let dependent_views = resolver.schema.get_dependent_materialized_views(table_name);
|
||||
@@ -651,3 +663,102 @@ pub fn translate_alter_table(
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn translate_rename_virtual_table(
|
||||
mut program: ProgramBuilder,
|
||||
vtab: Arc<VirtualTable>,
|
||||
old_name: &str,
|
||||
new_name_norm: String,
|
||||
resolver: &Resolver,
|
||||
) -> Result<ProgramBuilder> {
|
||||
program.begin_write_operation();
|
||||
let vtab_cur = program.alloc_cursor_id(CursorType::VirtualTable(vtab.clone()));
|
||||
program.emit_insn(Insn::VOpen {
|
||||
cursor_id: vtab_cur,
|
||||
});
|
||||
|
||||
let new_name_reg = program.emit_string8_new_reg(new_name_norm.clone());
|
||||
program.emit_insn(Insn::VRename {
|
||||
cursor_id: vtab_cur,
|
||||
new_name_reg,
|
||||
});
|
||||
// Rewrite sqlite_schema entry
|
||||
let sqlite_schema = resolver
|
||||
.schema
|
||||
.get_btree_table(SQLITE_TABLEID)
|
||||
.expect("sqlite_schema should be on schema");
|
||||
|
||||
let schema_cur = program.alloc_cursor_id(CursorType::BTreeTable(sqlite_schema.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: schema_cur,
|
||||
root_page: RegisterOrLiteral::Literal(sqlite_schema.root_page),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
program.cursor_loop(schema_cur, |program, rowid| {
|
||||
let ncols = sqlite_schema.columns.len();
|
||||
assert_eq!(ncols, 5);
|
||||
|
||||
let first_col = program.alloc_registers(ncols);
|
||||
for i in 0..ncols {
|
||||
program.emit_column_or_rowid(schema_cur, i, first_col + i);
|
||||
}
|
||||
|
||||
program.emit_string8_new_reg(old_name.to_string());
|
||||
program.mark_last_insn_constant();
|
||||
|
||||
program.emit_string8_new_reg(new_name_norm.clone());
|
||||
program.mark_last_insn_constant();
|
||||
|
||||
let out = program.alloc_registers(ncols);
|
||||
|
||||
program.emit_insn(Insn::Function {
|
||||
constant_mask: 0,
|
||||
start_reg: first_col,
|
||||
dest: out,
|
||||
func: crate::function::FuncCtx {
|
||||
func: Func::AlterTable(AlterTableFunc::RenameTable),
|
||||
arg_count: 7,
|
||||
},
|
||||
});
|
||||
|
||||
let rec = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: out,
|
||||
count: ncols,
|
||||
dest_reg: rec,
|
||||
index_name: None,
|
||||
affinity_str: None,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Insert {
|
||||
cursor: schema_cur,
|
||||
key_reg: rowid,
|
||||
record_reg: rec,
|
||||
flag: crate::vdbe::insn::InsertFlags(0),
|
||||
table_name: old_name.to_string(),
|
||||
});
|
||||
});
|
||||
|
||||
// Bump schema cookie
|
||||
program.emit_insn(Insn::SetCookie {
|
||||
db: 0,
|
||||
cookie: Cookie::SchemaVersion,
|
||||
value: resolver.schema.schema_version as i32 + 1,
|
||||
p5: 0,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::RenameTable {
|
||||
from: old_name.to_owned(),
|
||||
to: new_name_norm,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Close {
|
||||
cursor_id: schema_cur,
|
||||
});
|
||||
program.emit_insn(Insn::Close {
|
||||
cursor_id: vtab_cur,
|
||||
});
|
||||
|
||||
Ok(program)
|
||||
}
|
||||
|
||||
@@ -1905,6 +1905,9 @@ fn translate_virtual_table_insert(
|
||||
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
|
||||
};
|
||||
let table = Table::Virtual(virtual_table.clone());
|
||||
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
|
||||
program.emit_insn(Insn::VOpen { cursor_id });
|
||||
program.emit_insn(Insn::VBegin { cursor_id });
|
||||
/* *
|
||||
* Inserts for virtual tables are done in a single step.
|
||||
* argv[0] = (NULL for insert)
|
||||
@@ -1923,8 +1926,6 @@ fn translate_virtual_table_insert(
|
||||
translate_rows_single(&mut program, &value, &insertion, resolver)?;
|
||||
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
|
||||
|
||||
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
|
||||
|
||||
program.emit_insn(Insn::VUpdate {
|
||||
cursor_id,
|
||||
arg_count: insertion.col_mappings.len() + 2, // +1 for NULL, +1 for rowid
|
||||
@@ -1932,6 +1933,8 @@ fn translate_virtual_table_insert(
|
||||
conflict_action,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Close { cursor_id });
|
||||
|
||||
let halt_label = program.allocate_label();
|
||||
program.resolve_label(halt_label, program.offset());
|
||||
|
||||
|
||||
@@ -316,6 +316,9 @@ pub fn init_loop(
|
||||
}
|
||||
if let Some(cursor_id) = table_cursor_id {
|
||||
program.emit_insn(Insn::VOpen { cursor_id });
|
||||
if is_write {
|
||||
program.emit_insn(Insn::VBegin { cursor_id });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1272,6 +1272,65 @@ pub fn op_vdestroy(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_vbegin(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
insn: &Insn,
|
||||
pager: &Arc<Pager>,
|
||||
mv_store: Option<&Arc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
load_insn!(VBegin { cursor_id }, insn);
|
||||
let cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_virtual_mut();
|
||||
let vtab_id = cursor
|
||||
.vtab_id()
|
||||
.expect("VBegin on non ext-virtual table cursor");
|
||||
let mut states = program.connection.vtab_txn_states.write();
|
||||
if states.insert(vtab_id) {
|
||||
// Only begin a new transaction if one is not already active for this virtual table module
|
||||
let vtabs = &program.connection.syms.read().vtabs;
|
||||
let vtab = vtabs
|
||||
.iter()
|
||||
.find(|p| p.1.id().eq(&vtab_id))
|
||||
.expect("Could not find virtual table for VBegin");
|
||||
vtab.1.begin()?;
|
||||
}
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_vrename(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
insn: &Insn,
|
||||
pager: &Arc<Pager>,
|
||||
mv_store: Option<&Arc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
load_insn!(
|
||||
VRename {
|
||||
cursor_id,
|
||||
new_name_reg
|
||||
},
|
||||
insn
|
||||
);
|
||||
let name = state.registers[*new_name_reg].get_value().to_string();
|
||||
let conn = program.connection.clone();
|
||||
let cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_virtual_mut();
|
||||
let vtabs = &program.connection.syms.read().vtabs;
|
||||
let vtab = vtabs
|
||||
.iter()
|
||||
.find(|p| {
|
||||
p.1.id().eq(&cursor
|
||||
.vtab_id()
|
||||
.expect("non ext-virtual table used in VRollback"))
|
||||
})
|
||||
.expect("Could not find virtual table for VRollback");
|
||||
vtab.1.rename(&name)?;
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_open_pseudo(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -2011,6 +2070,7 @@ pub fn halt(
|
||||
if err_code > 0 {
|
||||
// Any non-FK constraint violation causes the statement subtransaction to roll back.
|
||||
state.end_statement(&program.connection, pager, EndStatement::RollbackSavepoint)?;
|
||||
vtab_rollback_all(&program.connection, state)?;
|
||||
}
|
||||
match err_code {
|
||||
0 => {}
|
||||
@@ -2062,6 +2122,7 @@ pub fn halt(
|
||||
.fk_deferred_violations
|
||||
.swap(0, Ordering::AcqRel);
|
||||
if deferred_violations > 0 {
|
||||
vtab_rollback_all(&program.connection, state)?;
|
||||
pager.rollback_tx(&program.connection);
|
||||
program.connection.set_tx_state(TransactionState::None);
|
||||
program.connection.auto_commit.store(true, Ordering::SeqCst);
|
||||
@@ -2071,6 +2132,7 @@ pub fn halt(
|
||||
}
|
||||
}
|
||||
state.end_statement(&program.connection, pager, EndStatement::ReleaseSavepoint)?;
|
||||
vtab_commit_all(&program.connection, state)?;
|
||||
program
|
||||
.commit_txn(pager.clone(), state, mv_store, false)
|
||||
.map(Into::into)
|
||||
@@ -2082,6 +2144,40 @@ pub fn halt(
|
||||
}
|
||||
}
|
||||
|
||||
/// Call xCommit on all virtual tables that participated in the current transaction.
|
||||
fn vtab_commit_all(conn: &Connection, state: &mut ProgramState) -> crate::Result<()> {
|
||||
let mut set = conn.vtab_txn_states.write();
|
||||
if set.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let reg = &conn.syms.read().vtabs;
|
||||
for id in set.drain() {
|
||||
let vtab = reg
|
||||
.iter()
|
||||
.find(|(_, vtab)| vtab.id() == id)
|
||||
.expect("vtab must exist");
|
||||
vtab.1.commit()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Rollback all virtual tables that are part of the current transaction.
|
||||
fn vtab_rollback_all(conn: &Connection, state: &mut ProgramState) -> crate::Result<()> {
|
||||
let mut set = conn.vtab_txn_states.write();
|
||||
if set.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let reg = &conn.syms.read().vtabs;
|
||||
for id in set.drain() {
|
||||
let vtab = reg
|
||||
.iter()
|
||||
.find(|(_, vtab)| vtab.id() == id)
|
||||
.expect("vtab must exist");
|
||||
vtab.1.rollback()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn op_halt(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -5419,6 +5515,32 @@ pub fn op_function(
|
||||
)
|
||||
}
|
||||
}
|
||||
ast::Stmt::CreateVirtualTable(ast::CreateVirtualTable {
|
||||
tbl_name,
|
||||
if_not_exists,
|
||||
module_name,
|
||||
args,
|
||||
}) => {
|
||||
let this_table = normalize_ident(tbl_name.name.as_str());
|
||||
if this_table != rename_from {
|
||||
None
|
||||
} else {
|
||||
let new_stmt =
|
||||
ast::Stmt::CreateVirtualTable(ast::CreateVirtualTable {
|
||||
tbl_name: ast::QualifiedName {
|
||||
db_name: tbl_name.db_name,
|
||||
name: ast::Name::exact(
|
||||
original_rename_to.to_string(),
|
||||
),
|
||||
alias: None,
|
||||
},
|
||||
if_not_exists,
|
||||
module_name,
|
||||
args,
|
||||
});
|
||||
Some(new_stmt.to_string())
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
@@ -8359,23 +8481,23 @@ pub fn op_rename_table(
|
||||
.tables
|
||||
.remove(&normalized_from)
|
||||
.expect("table being renamed should be in schema");
|
||||
|
||||
{
|
||||
let table = Arc::make_mut(&mut table);
|
||||
|
||||
let Table::BTree(btree) = table else {
|
||||
panic!("only btree tables can be renamed");
|
||||
};
|
||||
let btree = Arc::make_mut(btree);
|
||||
// update this table's own foreign keys
|
||||
for fk_arc in &mut btree.foreign_keys {
|
||||
let fk = Arc::make_mut(fk_arc);
|
||||
if normalize_ident(&fk.parent_table) == normalized_from {
|
||||
fk.parent_table = normalized_to.clone();
|
||||
match Arc::make_mut(&mut table) {
|
||||
Table::BTree(btree) => {
|
||||
let btree = Arc::make_mut(btree);
|
||||
// update this table's own foreign keys
|
||||
for fk_arc in &mut btree.foreign_keys {
|
||||
let fk = Arc::make_mut(fk_arc);
|
||||
if normalize_ident(&fk.parent_table) == normalized_from {
|
||||
fk.parent_table = normalized_to.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
btree.name = normalized_to.to_owned();
|
||||
btree.name = normalized_to.to_owned();
|
||||
}
|
||||
Table::Virtual(vtab) => {
|
||||
Arc::make_mut(vtab).name = normalized_to.clone();
|
||||
}
|
||||
_ => panic!("only btree and virtual tables can be renamed"),
|
||||
}
|
||||
|
||||
schema.tables.insert(normalized_to.to_owned(), table);
|
||||
|
||||
@@ -488,6 +488,24 @@ pub fn insn_to_row(
|
||||
0,
|
||||
"".to_string(),
|
||||
),
|
||||
Insn::VBegin{cursor_id} => (
|
||||
"VBegin",
|
||||
*cursor_id as i32,
|
||||
0,
|
||||
0,
|
||||
Value::build_text(""),
|
||||
0,
|
||||
"".into()
|
||||
),
|
||||
Insn::VRename{cursor_id, new_name_reg} => (
|
||||
"VRename",
|
||||
*cursor_id as i32,
|
||||
*new_name_reg as i32,
|
||||
0,
|
||||
Value::build_text(""),
|
||||
0,
|
||||
"".into(),
|
||||
),
|
||||
Insn::OpenPseudo {
|
||||
cursor_id,
|
||||
content_reg,
|
||||
|
||||
@@ -413,6 +413,16 @@ pub enum Insn {
|
||||
/// The database within which this virtual table needs to be destroyed (P1).
|
||||
db: usize,
|
||||
},
|
||||
VBegin {
|
||||
/// The database within which this virtual table transaction needs to begin (P1).
|
||||
cursor_id: CursorID,
|
||||
},
|
||||
VRename {
|
||||
/// The database within which this virtual table needs to be renamed (P1).
|
||||
cursor_id: CursorID,
|
||||
/// New name of the virtual table (P2).
|
||||
new_name_reg: usize,
|
||||
},
|
||||
|
||||
/// Open a cursor for a pseudo-table that contains a single row.
|
||||
OpenPseudo {
|
||||
@@ -1384,6 +1394,8 @@ impl InsnVariants {
|
||||
InsnVariants::SequenceTest => execute::op_sequence_test,
|
||||
InsnVariants::FkCounter => execute::op_fk_counter,
|
||||
InsnVariants::FkIfZero => execute::op_fk_if_zero,
|
||||
InsnVariants::VBegin => execute::op_vbegin,
|
||||
InsnVariants::VRename => execute::op_vrename,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
126
core/vtab.rs
126
core/vtab.rs
@@ -5,7 +5,7 @@ use crate::{Connection, LimboError, SymbolTable, Value};
|
||||
use parking_lot::RwLock;
|
||||
use std::ffi::c_void;
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::atomic::AtomicPtr;
|
||||
use std::sync::atomic::{AtomicPtr, Ordering};
|
||||
use std::sync::Arc;
|
||||
use turso_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
|
||||
use turso_parser::{ast, parser::Parser};
|
||||
@@ -23,9 +23,14 @@ pub struct VirtualTable {
|
||||
pub(crate) columns: Vec<Column>,
|
||||
pub(crate) kind: VTabKind,
|
||||
vtab_type: VirtualTableType,
|
||||
// identifier to tie a cursor to a specific instantiated virtual table instance
|
||||
vtab_id: u64,
|
||||
}
|
||||
|
||||
impl VirtualTable {
|
||||
pub(crate) fn id(&self) -> u64 {
|
||||
self.vtab_id
|
||||
}
|
||||
pub(crate) fn readonly(self: &Arc<VirtualTable>) -> bool {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => true,
|
||||
@@ -44,6 +49,7 @@ impl VirtualTable {
|
||||
.expect("pragma table-valued function schema resolution should not fail"),
|
||||
kind: VTabKind::TableValuedFunction,
|
||||
vtab_type: VirtualTableType::Pragma(tab),
|
||||
vtab_id: 0,
|
||||
};
|
||||
Arc::new(vtab)
|
||||
})
|
||||
@@ -67,6 +73,7 @@ impl VirtualTable {
|
||||
.expect("internal table-valued function schema resolution should not fail"),
|
||||
kind: VTabKind::TableValuedFunction,
|
||||
vtab_type: VirtualTableType::Internal(Arc::new(RwLock::new(json_each))),
|
||||
vtab_id: 0,
|
||||
};
|
||||
|
||||
let json_tree = JsonVirtualTable::json_tree();
|
||||
@@ -77,6 +84,7 @@ impl VirtualTable {
|
||||
.expect("internal table-valued function schema resolution should not fail"),
|
||||
kind: VTabKind::TableValuedFunction,
|
||||
vtab_type: VirtualTableType::Internal(Arc::new(RwLock::new(json_tree))),
|
||||
vtab_id: 0,
|
||||
};
|
||||
|
||||
vec![
|
||||
@@ -101,6 +109,7 @@ impl VirtualTable {
|
||||
columns: Self::resolve_columns(schema)?,
|
||||
kind: VTabKind::TableValuedFunction,
|
||||
vtab_type,
|
||||
vtab_id: 0,
|
||||
};
|
||||
Ok(Arc::new(vtab))
|
||||
}
|
||||
@@ -119,6 +128,7 @@ impl VirtualTable {
|
||||
columns: Self::resolve_columns(schema)?,
|
||||
kind: VTabKind::VirtualTable,
|
||||
vtab_type: VirtualTableType::External(table),
|
||||
vtab_id: VTAB_ID_COUNTER.fetch_add(1, Ordering::Acquire),
|
||||
};
|
||||
Ok(Arc::new(vtab))
|
||||
}
|
||||
@@ -141,9 +151,9 @@ impl VirtualTable {
|
||||
VirtualTableType::Pragma(table) => {
|
||||
Ok(VirtualTableCursor::Pragma(Box::new(table.open(conn)?)))
|
||||
}
|
||||
VirtualTableType::External(table) => {
|
||||
Ok(VirtualTableCursor::External(table.open(conn.clone())?))
|
||||
}
|
||||
VirtualTableType::External(table) => Ok(VirtualTableCursor::External(
|
||||
table.open(conn.clone(), self.vtab_id)?,
|
||||
)),
|
||||
VirtualTableType::Internal(table) => {
|
||||
Ok(VirtualTableCursor::Internal(table.read().open(conn)?))
|
||||
}
|
||||
@@ -177,6 +187,54 @@ impl VirtualTable {
|
||||
VirtualTableType::Internal(table) => table.read().best_index(constraints, order_by),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn begin(&self) -> crate::Result<()> {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
|
||||
"Pragma virtual tables do not support transactions".to_string(),
|
||||
)),
|
||||
VirtualTableType::External(table) => table.begin(),
|
||||
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
|
||||
"Internal virtual tables currently do not support transactions".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn commit(&self) -> crate::Result<()> {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
|
||||
"Pragma virtual tables do not support transactions".to_string(),
|
||||
)),
|
||||
VirtualTableType::External(table) => table.commit(),
|
||||
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
|
||||
"Internal virtual tables currently do not support transactions".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn rollback(&self) -> crate::Result<()> {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
|
||||
"Pragma virtual tables do not support transactions".to_string(),
|
||||
)),
|
||||
VirtualTableType::External(table) => table.rollback(),
|
||||
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
|
||||
"Internal virtual tables currently do not support transactions".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn rename(&self, new_name: &str) -> crate::Result<()> {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
|
||||
"Pragma virtual tables do not support renaming".to_string(),
|
||||
)),
|
||||
VirtualTableType::External(table) => table.rename(new_name),
|
||||
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
|
||||
"Internal virtual tables currently do not support renaming".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum VirtualTableCursor {
|
||||
@@ -225,6 +283,14 @@ impl VirtualTableCursor {
|
||||
VirtualTableCursor::Internal(cursor) => cursor.write().filter(&args, idx_str, idx_num),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn vtab_id(&self) -> Option<u64> {
|
||||
match self {
|
||||
VirtualTableCursor::Pragma(_) => None,
|
||||
VirtualTableCursor::External(cursor) => cursor.vtab_id.into(),
|
||||
VirtualTableCursor::Internal(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -232,6 +298,7 @@ pub(crate) struct ExtVirtualTable {
|
||||
implementation: Arc<VTabModuleImpl>,
|
||||
table_ptr: AtomicPtr<c_void>,
|
||||
}
|
||||
static VTAB_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
|
||||
|
||||
impl Clone for ExtVirtualTable {
|
||||
fn clone(&self) -> Self {
|
||||
@@ -290,7 +357,7 @@ impl ExtVirtualTable {
|
||||
|
||||
/// Accepts a pointer connection that owns the VTable, that the module
|
||||
/// can optionally use to query the other tables.
|
||||
fn open(&self, conn: Arc<Connection>) -> crate::Result<ExtVirtualTableCursor> {
|
||||
fn open(&self, conn: Arc<Connection>, id: u64) -> crate::Result<ExtVirtualTableCursor> {
|
||||
// we need a Weak<Connection> to upgrade and call from the extension.
|
||||
let weak = Arc::downgrade(&conn);
|
||||
let weak_box = Box::into_raw(Box::new(weak));
|
||||
@@ -309,7 +376,7 @@ impl ExtVirtualTable {
|
||||
}) else {
|
||||
return Err(LimboError::ExtensionError("Open returned null".to_string()));
|
||||
};
|
||||
ExtVirtualTableCursor::new(cursor, ext_conn_ptr, self.implementation.clone())
|
||||
ExtVirtualTableCursor::new(cursor, ext_conn_ptr, self.implementation.clone(), id)
|
||||
}
|
||||
|
||||
fn update(&self, args: &[Value]) -> crate::Result<Option<i64>> {
|
||||
@@ -347,6 +414,50 @@ impl ExtVirtualTable {
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
fn commit(&self) -> crate::Result<()> {
|
||||
let rc = unsafe {
|
||||
(self.implementation.commit)(self.table_ptr.load(std::sync::atomic::Ordering::SeqCst))
|
||||
};
|
||||
match rc {
|
||||
ResultCode::OK => Ok(()),
|
||||
_ => Err(LimboError::ExtensionError("Commit failed".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
fn begin(&self) -> crate::Result<()> {
|
||||
let rc = unsafe {
|
||||
(self.implementation.begin)(self.table_ptr.load(std::sync::atomic::Ordering::SeqCst))
|
||||
};
|
||||
match rc {
|
||||
ResultCode::OK => Ok(()),
|
||||
_ => Err(LimboError::ExtensionError("Begin failed".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
fn rollback(&self) -> crate::Result<()> {
|
||||
let rc = unsafe {
|
||||
(self.implementation.rollback)(self.table_ptr.load(std::sync::atomic::Ordering::SeqCst))
|
||||
};
|
||||
match rc {
|
||||
ResultCode::OK => Ok(()),
|
||||
_ => Err(LimboError::ExtensionError("Rollback failed".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
fn rename(&self, new_name: &str) -> crate::Result<()> {
|
||||
let c_new_name = std::ffi::CString::new(new_name).unwrap();
|
||||
let rc = unsafe {
|
||||
(self.implementation.rename)(
|
||||
self.table_ptr.load(std::sync::atomic::Ordering::SeqCst),
|
||||
c_new_name.as_ptr(),
|
||||
)
|
||||
};
|
||||
match rc {
|
||||
ResultCode::OK => Ok(()),
|
||||
_ => Err(LimboError::ExtensionError("Rename failed".to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ExtVirtualTableCursor {
|
||||
@@ -355,6 +466,7 @@ pub struct ExtVirtualTableCursor {
|
||||
// query other internal tables.
|
||||
conn_ptr: Option<NonNull<turso_ext::Conn>>,
|
||||
implementation: Arc<VTabModuleImpl>,
|
||||
vtab_id: u64,
|
||||
}
|
||||
|
||||
impl ExtVirtualTableCursor {
|
||||
@@ -362,11 +474,13 @@ impl ExtVirtualTableCursor {
|
||||
cursor: NonNull<c_void>,
|
||||
conn_ptr: NonNull<turso_ext::Conn>,
|
||||
implementation: Arc<VTabModuleImpl>,
|
||||
id: u64,
|
||||
) -> crate::Result<Self> {
|
||||
Ok(Self {
|
||||
cursor,
|
||||
conn_ptr: Some(conn_ptr),
|
||||
implementation,
|
||||
vtab_id: id,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,10 @@ pub struct VTabModuleImpl {
|
||||
pub rowid: VtabRowIDFn,
|
||||
pub destroy: VtabFnDestroy,
|
||||
pub best_idx: BestIdxFn,
|
||||
pub begin: VtabBegin,
|
||||
pub commit: VtabCommit,
|
||||
pub rollback: VtabRollback,
|
||||
pub rename: VtabRename,
|
||||
}
|
||||
|
||||
// SAFETY: VTabModuleImpl contains function pointers and a name pointer that are
|
||||
@@ -108,6 +112,12 @@ pub type VtabFnUpdate = unsafe extern "C" fn(
|
||||
|
||||
pub type VtabFnDestroy = unsafe extern "C" fn(table: *const c_void) -> ResultCode;
|
||||
|
||||
pub type VtabBegin = unsafe extern "C" fn(table: *mut c_void) -> ResultCode;
|
||||
pub type VtabCommit = unsafe extern "C" fn(table: *mut c_void) -> ResultCode;
|
||||
pub type VtabRollback = unsafe extern "C" fn(table: *mut c_void) -> ResultCode;
|
||||
pub type VtabRename =
|
||||
unsafe extern "C" fn(table: *mut c_void, new_name: *const c_char) -> ResultCode;
|
||||
|
||||
pub type BestIdxFn = unsafe extern "C" fn(
|
||||
constraints: *const ConstraintInfo,
|
||||
constraint_len: i32,
|
||||
@@ -140,9 +150,21 @@ pub trait VTable {
|
||||
/// 'conn' is an Option to allow for testing. Otherwise a valid connection to the core database
|
||||
/// that created the virtual table will be available to use in your extension here.
|
||||
fn open(&self, _conn: Option<Arc<Connection>>) -> Result<Self::Cursor, Self::Error>;
|
||||
fn begin(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn commit(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn rollback(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn update(&mut self, _rowid: i64, _args: &[Value]) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn rename(&mut self, _new_name: &str) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn insert(&mut self, _args: &[Value]) -> Result<i64, Self::Error> {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ impl VTabModule for KVStoreVTabModule {
|
||||
schema,
|
||||
KVStoreTable {
|
||||
store: Rc::new(RefCell::new(BTreeMap::new())),
|
||||
in_tx: false,
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -152,6 +153,7 @@ impl VTabCursor for KVStoreCursor {
|
||||
|
||||
pub struct KVStoreTable {
|
||||
store: Store,
|
||||
in_tx: bool,
|
||||
}
|
||||
|
||||
impl VTable for KVStoreTable {
|
||||
@@ -167,6 +169,31 @@ impl VTable for KVStoreTable {
|
||||
})
|
||||
}
|
||||
|
||||
fn begin(&mut self) -> Result<(), Self::Error> {
|
||||
assert!(!self.in_tx, "Already in a transaction");
|
||||
self.in_tx = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn commit(&mut self) -> Result<(), Self::Error> {
|
||||
assert!(self.in_tx, "Not in a transaction");
|
||||
self.in_tx = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rollback(&mut self) -> Result<(), Self::Error> {
|
||||
assert!(self.in_tx, "Not in a transaction");
|
||||
self.in_tx = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rename(&mut self, new_name: &str) -> Result<(), Self::Error> {
|
||||
// not a real extension of course, just asserting test in
|
||||
// testing/cli_tests/extensions.py
|
||||
assert_eq!(new_name, "renamed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn best_index(
|
||||
constraints: &[ConstraintInfo],
|
||||
_order_by: &[OrderByInfo],
|
||||
|
||||
@@ -18,6 +18,10 @@ pub fn derive_vtab_module(input: TokenStream) -> TokenStream {
|
||||
let rowid_fn_name = format_ident!("rowid_{}", struct_name);
|
||||
let destroy_fn_name = format_ident!("destroy_{}", struct_name);
|
||||
let best_idx_fn_name = format_ident!("best_idx_{}", struct_name);
|
||||
let begin_fn_name = format_ident!("begin_{}", struct_name);
|
||||
let rollback_fn_name = format_ident!("rollback_{}", struct_name);
|
||||
let commit_fn_name = format_ident!("commit_{}", struct_name);
|
||||
let rename_fn_name = format_ident!("rename_{}", struct_name);
|
||||
|
||||
let expanded = quote! {
|
||||
impl #struct_name {
|
||||
@@ -227,6 +231,75 @@ pub fn derive_vtab_module(input: TokenStream) -> TokenStream {
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn #begin_fn_name(
|
||||
table: *mut ::std::ffi::c_void,
|
||||
) -> ::turso_ext::ResultCode {
|
||||
let table = if table.is_null() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
} else {
|
||||
&mut *(table as *mut <#struct_name as ::turso_ext::VTabModule>::Table)
|
||||
};
|
||||
if <#struct_name as ::turso_ext::VTabModule>::Table::begin(table).is_err() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
}
|
||||
::turso_ext::ResultCode::OK
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn #rollback_fn_name(
|
||||
table: *mut ::std::ffi::c_void,
|
||||
) -> ::turso_ext::ResultCode {
|
||||
let table = if table.is_null() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
} else {
|
||||
&mut *(table as *mut <#struct_name as ::turso_ext::VTabModule>::Table)
|
||||
};
|
||||
if <#struct_name as ::turso_ext::VTabModule>::Table::rollback(table).is_err() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
}
|
||||
::turso_ext::ResultCode::OK
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn #commit_fn_name(
|
||||
table: *mut ::std::ffi::c_void,
|
||||
) -> ::turso_ext::ResultCode {
|
||||
let table = if table.is_null() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
} else {
|
||||
&mut *(table as *mut <#struct_name as ::turso_ext::VTabModule>::Table)
|
||||
};
|
||||
if <#struct_name as ::turso_ext::VTabModule>::Table::commit(table).is_err() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
}
|
||||
::turso_ext::ResultCode::OK
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn #rename_fn_name(
|
||||
table: *mut ::std::ffi::c_void,
|
||||
name: *const ::std::ffi::c_char,
|
||||
) -> ::turso_ext::ResultCode {
|
||||
let table = if table.is_null() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
} else {
|
||||
&mut *(table as *mut <#struct_name as ::turso_ext::VTabModule>::Table)
|
||||
};
|
||||
let name_str = if name.is_null() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
} else {
|
||||
match ::std::ffi::CStr::from_ptr(name).to_str() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return ::turso_ext::ResultCode::Error,
|
||||
}
|
||||
};
|
||||
if <#struct_name as ::turso_ext::VTabModule>::Table::rename(table, name_str).is_err() {
|
||||
return ::turso_ext::ResultCode::Error;
|
||||
}
|
||||
::turso_ext::ResultCode::OK
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn #register_fn_name(
|
||||
api: *const ::turso_ext::ExtensionApi
|
||||
@@ -251,6 +324,10 @@ pub fn derive_vtab_module(input: TokenStream) -> TokenStream {
|
||||
rowid: Self::#rowid_fn_name,
|
||||
destroy: Self::#destroy_fn_name,
|
||||
best_idx: Self::#best_idx_fn_name,
|
||||
begin: Self::#begin_fn_name,
|
||||
rollback: Self::#rollback_fn_name,
|
||||
commit: Self::#commit_fn_name,
|
||||
rename: Self::#rename_fn_name,
|
||||
};
|
||||
(api.register_vtab_module)(api.ctx, name_c, module, <#struct_name as ::turso_ext::VTabModule>::VTAB_KIND)
|
||||
}
|
||||
|
||||
@@ -490,6 +490,12 @@ def _test_kv(exec_name, ext_path):
|
||||
"select * from t a, other b where b.c = 23 and a.key='100';",
|
||||
lambda res: "100|updated2|23|32|23" == res,
|
||||
)
|
||||
turso.run_test_fn("alter table t rename to renamed;", lambda res: "" == res, "can rename virtual table")
|
||||
turso.run_test_fn(
|
||||
"select sql from sqlite_schema where name = 'renamed';",
|
||||
lambda res: "CREATE VIRTUAL TABLE renamed USING kv_store ()",
|
||||
"renamed table shows up in sqlite_schema",
|
||||
)
|
||||
turso.quit()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user