Add support for renaming virtual tables

This commit is contained in:
PThorpe92
2025-11-09 10:36:28 -05:00
parent 40f7ddb28e
commit f35ccfba17
8 changed files with 370 additions and 115 deletions

View File

@@ -69,6 +69,7 @@ pub use io::{
use parking_lot::RwLock; use parking_lot::RwLock;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use schema::Schema; use schema::Schema;
use std::collections::HashSet;
use std::task::Waker; use std::task::Waker;
use std::{ use std::{
borrow::Cow, borrow::Cow,
@@ -649,6 +650,7 @@ impl Database {
is_mvcc_bootstrap_connection: AtomicBool::new(is_mvcc_bootstrap_connection), is_mvcc_bootstrap_connection: AtomicBool::new(is_mvcc_bootstrap_connection),
fk_pragma: AtomicBool::new(false), fk_pragma: AtomicBool::new(false),
fk_deferred_violations: AtomicIsize::new(0), fk_deferred_violations: AtomicIsize::new(0),
vtab_txn_states: RwLock::new(HashSet::new()),
}); });
self.n_connections self.n_connections
.fetch_add(1, std::sync::atomic::Ordering::SeqCst); .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
@@ -1177,6 +1179,8 @@ pub struct Connection {
/// Whether pragma foreign_keys=ON for this connection /// Whether pragma foreign_keys=ON for this connection
fk_pragma: AtomicBool, fk_pragma: AtomicBool,
fk_deferred_violations: AtomicIsize, fk_deferred_violations: AtomicIsize,
/// Whether each virtual table module is currently in transaction.
vtab_txn_states: RwLock<HashSet<u64>>,
} }
// SAFETY: This needs to be audited for thread safety. // SAFETY: This needs to be audited for thread safety.

View File

@@ -17,6 +17,7 @@ use crate::{
builder::{CursorType, ProgramBuilder}, builder::{CursorType, ProgramBuilder},
insn::{Cookie, Insn, RegisterOrLiteral}, insn::{Cookie, Insn, RegisterOrLiteral},
}, },
vtab::VirtualTable,
LimboError, Result, LimboError, Result,
}; };
@@ -62,15 +63,26 @@ pub fn translate_alter_table(
); );
} }
let Some(original_btree) = resolver let Some(table) = resolver.schema.get_table(table_name) else {
.schema
.get_table(table_name)
.and_then(|table| table.btree())
else {
return Err(LimboError::ParseError(format!( return Err(LimboError::ParseError(format!(
"no such table: {table_name}" "no such table: {table_name}"
))); )));
}; };
if let Some(tbl) = table.virtual_table() {
if let ast::AlterTableBody::RenameTo(new_name) = &alter_table {
let new_name_norm = normalize_ident(new_name.as_str());
return translate_rename_virtual_table(
program,
tbl,
table_name,
new_name_norm,
resolver,
);
}
}
let Some(original_btree) = table.btree() else {
crate::bail_parse_error!("ALTER TABLE is only supported for BTree tables");
};
// Check if this table has dependent materialized views // Check if this table has dependent materialized views
let dependent_views = resolver.schema.get_dependent_materialized_views(table_name); let dependent_views = resolver.schema.get_dependent_materialized_views(table_name);
@@ -651,3 +663,102 @@ pub fn translate_alter_table(
} }
}) })
} }
fn translate_rename_virtual_table(
mut program: ProgramBuilder,
vtab: Arc<VirtualTable>,
old_name: &str,
new_name_norm: String,
resolver: &Resolver,
) -> Result<ProgramBuilder> {
program.begin_write_operation();
let vtab_cur = program.alloc_cursor_id(CursorType::VirtualTable(vtab.clone()));
program.emit_insn(Insn::VOpen {
cursor_id: vtab_cur,
});
let new_name_reg = program.emit_string8_new_reg(new_name_norm.clone());
program.emit_insn(Insn::VRename {
cursor_id: vtab_cur,
new_name_reg,
});
// Rewrite sqlite_schema entry
let sqlite_schema = resolver
.schema
.get_btree_table(SQLITE_TABLEID)
.expect("sqlite_schema should be on schema");
let schema_cur = program.alloc_cursor_id(CursorType::BTreeTable(sqlite_schema.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: schema_cur,
root_page: RegisterOrLiteral::Literal(sqlite_schema.root_page),
db: 0,
});
program.cursor_loop(schema_cur, |program, rowid| {
let ncols = sqlite_schema.columns.len();
assert_eq!(ncols, 5);
let first_col = program.alloc_registers(ncols);
for i in 0..ncols {
program.emit_column_or_rowid(schema_cur, i, first_col + i);
}
program.emit_string8_new_reg(old_name.to_string());
program.mark_last_insn_constant();
program.emit_string8_new_reg(new_name_norm.clone());
program.mark_last_insn_constant();
let out = program.alloc_registers(ncols);
program.emit_insn(Insn::Function {
constant_mask: 0,
start_reg: first_col,
dest: out,
func: crate::function::FuncCtx {
func: Func::AlterTable(AlterTableFunc::RenameTable),
arg_count: 7,
},
});
let rec = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: out,
count: ncols,
dest_reg: rec,
index_name: None,
affinity_str: None,
});
program.emit_insn(Insn::Insert {
cursor: schema_cur,
key_reg: rowid,
record_reg: rec,
flag: crate::vdbe::insn::InsertFlags(0),
table_name: old_name.to_string(),
});
});
// Bump schema cookie
program.emit_insn(Insn::SetCookie {
db: 0,
cookie: Cookie::SchemaVersion,
value: resolver.schema.schema_version as i32 + 1,
p5: 0,
});
program.emit_insn(Insn::RenameTable {
from: old_name.to_owned(),
to: new_name_norm,
});
program.emit_insn(Insn::Close {
cursor_id: schema_cur,
});
program.emit_insn(Insn::Close {
cursor_id: vtab_cur,
});
Ok(program)
}

View File

@@ -1904,6 +1904,9 @@ fn translate_virtual_table_insert(
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"), _ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
}; };
let table = Table::Virtual(virtual_table.clone()); let table = Table::Virtual(virtual_table.clone());
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
program.emit_insn(Insn::VOpen { cursor_id });
program.emit_insn(Insn::VBegin { cursor_id });
/* * /* *
* Inserts for virtual tables are done in a single step. * Inserts for virtual tables are done in a single step.
* argv[0] = (NULL for insert) * argv[0] = (NULL for insert)
@@ -1922,8 +1925,6 @@ fn translate_virtual_table_insert(
translate_rows_single(&mut program, &value, &insertion, resolver)?; translate_rows_single(&mut program, &value, &insertion, resolver)?;
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16; let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
program.emit_insn(Insn::VUpdate { program.emit_insn(Insn::VUpdate {
cursor_id, cursor_id,
arg_count: insertion.col_mappings.len() + 2, // +1 for NULL, +1 for rowid arg_count: insertion.col_mappings.len() + 2, // +1 for NULL, +1 for rowid
@@ -1931,6 +1932,8 @@ fn translate_virtual_table_insert(
conflict_action, conflict_action,
}); });
program.emit_insn(Insn::Close { cursor_id });
let halt_label = program.allocate_label(); let halt_label = program.allocate_label();
program.resolve_label(halt_label, program.offset()); program.resolve_label(halt_label, program.offset());

View File

@@ -313,6 +313,9 @@ pub fn init_loop(
} }
if let Some(cursor_id) = table_cursor_id { if let Some(cursor_id) = table_cursor_id {
program.emit_insn(Insn::VOpen { cursor_id }); program.emit_insn(Insn::VOpen { cursor_id });
if is_write {
program.emit_insn(Insn::VBegin { cursor_id });
}
} }
} }
} }

View File

@@ -1417,7 +1417,20 @@ pub fn op_vbegin(
load_insn!(VBegin { cursor_id }, insn); load_insn!(VBegin { cursor_id }, insn);
let cursor = state.get_cursor(*cursor_id); let cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_virtual_mut(); let cursor = cursor.as_virtual_mut();
cursor.begin()?; let vtab_id = cursor
.vtab_id()
.expect("VBegin on non ext-virtual table cursor");
let mut states = program.connection.vtab_txn_states.write();
if states.insert(vtab_id) {
// A transaction is already active for this virtual table module
// Only begin a new transaction if one is not already active for this virtual table module
let vtabs = &program.connection.syms.read().vtabs;
let vtab = vtabs
.iter()
.find(|p| p.1.id().eq(&vtab_id))
.expect("Could not find virtual table for VBegin");
vtab.1.begin()?;
}
state.pc += 1; state.pc += 1;
Ok(InsnFunctionStepResult::Step) Ok(InsnFunctionStepResult::Step)
} }
@@ -1432,7 +1445,16 @@ pub fn op_vcommit(
load_insn!(VCommit { cursor_id }, insn); load_insn!(VCommit { cursor_id }, insn);
let cursor = state.get_cursor(*cursor_id); let cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_virtual_mut(); let cursor = cursor.as_virtual_mut();
cursor.commit()?; let vtab_id = cursor
.vtab_id()
.expect("VCommit on non ext-virtual table cursor");
let vtabs = &program.connection.syms.read().vtabs;
let vtab = vtabs
.iter()
.find(|p| p.1.id().eq(&vtab_id))
.expect("Could not find virtual table for VCommit");
program.connection.vtab_txn_states.write().remove(&vtab_id);
vtab.1.commit()?;
state.pc += 1; state.pc += 1;
Ok(InsnFunctionStepResult::Step) Ok(InsnFunctionStepResult::Step)
} }
@@ -1447,7 +1469,16 @@ pub fn op_vrollback(
load_insn!(VRollback { cursor_id }, insn); load_insn!(VRollback { cursor_id }, insn);
let cursor = state.get_cursor(*cursor_id); let cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_virtual_mut(); let cursor = cursor.as_virtual_mut();
cursor.rollback()?; let vtabs = &program.connection.syms.read().vtabs;
let vtab = vtabs
.iter()
.find(|p| {
p.1.id().eq(&cursor
.vtab_id()
.expect("non ext-virtual table used in VRollback"))
})
.expect("Could not find virtual table for VRollback");
vtab.1.rollback()?;
state.pc += 1; state.pc += 1;
Ok(InsnFunctionStepResult::Step) Ok(InsnFunctionStepResult::Step)
} }
@@ -1462,14 +1493,24 @@ pub fn op_vrename(
load_insn!( load_insn!(
VRename { VRename {
cursor_id, cursor_id,
new_table_name new_name_reg
}, },
insn insn
); );
let name = state.registers[*new_name_reg].get_value().to_string();
let conn = program.connection.clone(); let conn = program.connection.clone();
let cursor = state.get_cursor(*cursor_id); let cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_virtual_mut(); let cursor = cursor.as_virtual_mut();
cursor.rename(&new_table_name)?; let vtabs = &program.connection.syms.read().vtabs;
let vtab = vtabs
.iter()
.find(|p| {
p.1.id().eq(&cursor
.vtab_id()
.expect("non ext-virtual table used in VRollback"))
})
.expect("Could not find virtual table for VRollback");
vtab.1.rename(&name)?;
state.pc += 1; state.pc += 1;
Ok(InsnFunctionStepResult::Step) Ok(InsnFunctionStepResult::Step)
} }
@@ -2264,6 +2305,7 @@ pub fn halt(
.fk_deferred_violations .fk_deferred_violations
.swap(0, Ordering::AcqRel); .swap(0, Ordering::AcqRel);
if deferred_violations > 0 { if deferred_violations > 0 {
vtab_rollback_all(&program.connection, state)?;
pager.rollback_tx(&program.connection); pager.rollback_tx(&program.connection);
program.connection.set_tx_state(TransactionState::None); program.connection.set_tx_state(TransactionState::None);
program.connection.auto_commit.store(true, Ordering::SeqCst); program.connection.auto_commit.store(true, Ordering::SeqCst);
@@ -2273,6 +2315,7 @@ pub fn halt(
} }
} }
state.end_statement(&program.connection, pager, EndStatement::ReleaseSavepoint)?; state.end_statement(&program.connection, pager, EndStatement::ReleaseSavepoint)?;
vtab_commit_all(&program.connection, state)?;
program program
.commit_txn(pager.clone(), state, mv_store, false) .commit_txn(pager.clone(), state, mv_store, false)
.map(Into::into) .map(Into::into)
@@ -2284,6 +2327,38 @@ pub fn halt(
} }
} }
fn vtab_commit_all(conn: &Connection, state: &mut ProgramState) -> crate::Result<()> {
let mut set = conn.vtab_txn_states.write();
if set.is_empty() {
return Ok(());
}
let reg = &conn.syms.read().vtabs;
for id in set.drain() {
let vtab = reg
.iter()
.find(|(_, vtab)| vtab.id() == id)
.expect("vtab must exist");
vtab.1.commit()?;
}
Ok(())
}
fn vtab_rollback_all(conn: &Connection, state: &mut ProgramState) -> crate::Result<()> {
let mut set = conn.vtab_txn_states.write();
if set.is_empty() {
return Ok(());
}
let reg = &conn.syms.read().vtabs;
for id in set.drain() {
let vtab = reg
.iter()
.find(|(_, vtab)| vtab.id() == id)
.expect("vtab must exist");
vtab.1.rollback()?;
}
Ok(())
}
pub fn op_halt( pub fn op_halt(
program: &Program, program: &Program,
state: &mut ProgramState, state: &mut ProgramState,
@@ -5614,6 +5689,32 @@ pub fn op_function(
) )
} }
} }
ast::Stmt::CreateVirtualTable(ast::CreateVirtualTable {
tbl_name,
if_not_exists,
module_name,
args,
}) => {
let this_table = normalize_ident(tbl_name.name.as_str());
if this_table != rename_from {
None
} else {
let new_stmt =
ast::Stmt::CreateVirtualTable(ast::CreateVirtualTable {
tbl_name: ast::QualifiedName {
db_name: tbl_name.db_name,
name: ast::Name::exact(
original_rename_to.to_string(),
),
alias: None,
},
if_not_exists,
module_name,
args,
});
Some(new_stmt.to_string())
}
}
_ => None, _ => None,
} }
}; };
@@ -8554,23 +8655,23 @@ pub fn op_rename_table(
.tables .tables
.remove(&normalized_from) .remove(&normalized_from)
.expect("table being renamed should be in schema"); .expect("table being renamed should be in schema");
match Arc::make_mut(&mut table) {
{ Table::BTree(btree) => {
let table = Arc::make_mut(&mut table); let btree = Arc::make_mut(btree);
// update this table's own foreign keys
let Table::BTree(btree) = table else { for fk_arc in &mut btree.foreign_keys {
panic!("only btree tables can be renamed"); let fk = Arc::make_mut(fk_arc);
}; if normalize_ident(&fk.parent_table) == normalized_from {
let btree = Arc::make_mut(btree); fk.parent_table = normalized_to.clone();
// update this table's own foreign keys }
for fk_arc in &mut btree.foreign_keys {
let fk = Arc::make_mut(fk_arc);
if normalize_ident(&fk.parent_table) == normalized_from {
fk.parent_table = normalized_to.clone();
} }
}
btree.name = normalized_to.to_owned(); btree.name = normalized_to.to_owned();
}
Table::Virtual(vtab) => {
Arc::make_mut(vtab).name = normalized_to.clone();
}
_ => panic!("only btree and virtual tables can be renamed"),
} }
schema.tables.insert(normalized_to.to_owned(), table); schema.tables.insert(normalized_to.to_owned(), table);

View File

@@ -515,12 +515,12 @@ pub fn insn_to_row(
0, 0,
"".into(), "".into(),
), ),
Insn::VRename{cursor_id, new_table_name} => ( Insn::VRename{cursor_id, new_name_reg} => (
"VCommit", "VRename",
*cursor_id as i32, *cursor_id as i32,
*new_name_reg as i32,
0, 0,
0, Value::build_text(""),
Value::build_text(&new_table_name),
0, 0,
"".into(), "".into(),
), ),

View File

@@ -428,7 +428,7 @@ pub enum Insn {
/// The database within which this virtual table needs to be renamed (P1). /// The database within which this virtual table needs to be renamed (P1).
cursor_id: CursorID, cursor_id: CursorID,
/// New name of the virtual table (P2). /// New name of the virtual table (P2).
new_table_name: String, new_name_reg: usize,
}, },
/// Open a cursor for a pseudo-table that contains a single row. /// Open a cursor for a pseudo-table that contains a single row.

View File

@@ -5,7 +5,7 @@ use crate::{Connection, LimboError, SymbolTable, Value};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::ffi::c_void; use std::ffi::c_void;
use std::ptr::NonNull; use std::ptr::NonNull;
use std::sync::atomic::AtomicPtr; use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc; use std::sync::Arc;
use turso_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl}; use turso_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
use turso_parser::{ast, parser::Parser}; use turso_parser::{ast, parser::Parser};
@@ -23,9 +23,14 @@ pub struct VirtualTable {
pub(crate) columns: Vec<Column>, pub(crate) columns: Vec<Column>,
pub(crate) kind: VTabKind, pub(crate) kind: VTabKind,
vtab_type: VirtualTableType, vtab_type: VirtualTableType,
// identifier to tie a cursor to a specific instantiated virtual table instance
vtab_id: u64,
} }
impl VirtualTable { impl VirtualTable {
pub(crate) fn id(&self) -> u64 {
self.vtab_id
}
pub(crate) fn readonly(self: &Arc<VirtualTable>) -> bool { pub(crate) fn readonly(self: &Arc<VirtualTable>) -> bool {
match &self.vtab_type { match &self.vtab_type {
VirtualTableType::Pragma(_) => true, VirtualTableType::Pragma(_) => true,
@@ -44,6 +49,7 @@ impl VirtualTable {
.expect("pragma table-valued function schema resolution should not fail"), .expect("pragma table-valued function schema resolution should not fail"),
kind: VTabKind::TableValuedFunction, kind: VTabKind::TableValuedFunction,
vtab_type: VirtualTableType::Pragma(tab), vtab_type: VirtualTableType::Pragma(tab),
vtab_id: 0,
}; };
Arc::new(vtab) Arc::new(vtab)
}) })
@@ -67,6 +73,7 @@ impl VirtualTable {
.expect("internal table-valued function schema resolution should not fail"), .expect("internal table-valued function schema resolution should not fail"),
kind: VTabKind::TableValuedFunction, kind: VTabKind::TableValuedFunction,
vtab_type: VirtualTableType::Internal(Arc::new(RwLock::new(json_each))), vtab_type: VirtualTableType::Internal(Arc::new(RwLock::new(json_each))),
vtab_id: 0,
}; };
let json_tree = JsonVirtualTable::json_tree(); let json_tree = JsonVirtualTable::json_tree();
@@ -77,6 +84,7 @@ impl VirtualTable {
.expect("internal table-valued function schema resolution should not fail"), .expect("internal table-valued function schema resolution should not fail"),
kind: VTabKind::TableValuedFunction, kind: VTabKind::TableValuedFunction,
vtab_type: VirtualTableType::Internal(Arc::new(RwLock::new(json_tree))), vtab_type: VirtualTableType::Internal(Arc::new(RwLock::new(json_tree))),
vtab_id: 0,
}; };
vec![ vec![
@@ -101,6 +109,7 @@ impl VirtualTable {
columns: Self::resolve_columns(schema)?, columns: Self::resolve_columns(schema)?,
kind: VTabKind::TableValuedFunction, kind: VTabKind::TableValuedFunction,
vtab_type, vtab_type,
vtab_id: 0,
}; };
Ok(Arc::new(vtab)) Ok(Arc::new(vtab))
} }
@@ -119,6 +128,7 @@ impl VirtualTable {
columns: Self::resolve_columns(schema)?, columns: Self::resolve_columns(schema)?,
kind: VTabKind::VirtualTable, kind: VTabKind::VirtualTable,
vtab_type: VirtualTableType::External(table), vtab_type: VirtualTableType::External(table),
vtab_id: VTAB_ID_COUNTER.fetch_add(1, Ordering::Acquire),
}; };
Ok(Arc::new(vtab)) Ok(Arc::new(vtab))
} }
@@ -141,9 +151,9 @@ impl VirtualTable {
VirtualTableType::Pragma(table) => { VirtualTableType::Pragma(table) => {
Ok(VirtualTableCursor::Pragma(Box::new(table.open(conn)?))) Ok(VirtualTableCursor::Pragma(Box::new(table.open(conn)?)))
} }
VirtualTableType::External(table) => { VirtualTableType::External(table) => Ok(VirtualTableCursor::External(
Ok(VirtualTableCursor::External(table.open(conn.clone())?)) table.open(conn.clone(), self.vtab_id)?,
} )),
VirtualTableType::Internal(table) => { VirtualTableType::Internal(table) => {
Ok(VirtualTableCursor::Internal(table.read().open(conn)?)) Ok(VirtualTableCursor::Internal(table.read().open(conn)?))
} }
@@ -177,6 +187,54 @@ impl VirtualTable {
VirtualTableType::Internal(table) => table.read().best_index(constraints, order_by), VirtualTableType::Internal(table) => table.read().best_index(constraints, order_by),
} }
} }
pub(crate) fn begin(&self) -> crate::Result<()> {
match &self.vtab_type {
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support transactions".to_string(),
)),
VirtualTableType::External(table) => table.begin(),
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support transactions".to_string(),
)),
}
}
pub(crate) fn commit(&self) -> crate::Result<()> {
match &self.vtab_type {
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support transactions".to_string(),
)),
VirtualTableType::External(table) => table.commit(),
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support transactions".to_string(),
)),
}
}
pub(crate) fn rollback(&self) -> crate::Result<()> {
match &self.vtab_type {
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support transactions".to_string(),
)),
VirtualTableType::External(table) => table.rollback(),
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support transactions".to_string(),
)),
}
}
pub(crate) fn rename(&self, new_name: &str) -> crate::Result<()> {
match &self.vtab_type {
VirtualTableType::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support renaming".to_string(),
)),
VirtualTableType::External(table) => table.rename(new_name),
VirtualTableType::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support renaming".to_string(),
)),
}
}
} }
pub enum VirtualTableCursor { pub enum VirtualTableCursor {
@@ -226,51 +284,11 @@ impl VirtualTableCursor {
} }
} }
pub(crate) fn begin(&self) -> crate::Result<()> { pub(crate) fn vtab_id(&self) -> Option<u64> {
match self { match self {
VirtualTableCursor::Pragma(_) => Err(LimboError::ExtensionError( VirtualTableCursor::Pragma(_) => None,
"Pragma virtual tables do not support transactions".to_string(), VirtualTableCursor::External(cursor) => cursor.vtab_id.into(),
)), VirtualTableCursor::Internal(_) => None,
VirtualTableCursor::External(cursor) => cursor.begin(),
VirtualTableCursor::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support transactions".to_string(),
)),
}
}
pub(crate) fn rollback(&self) -> crate::Result<()> {
match self {
VirtualTableCursor::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support transactions".to_string(),
)),
VirtualTableCursor::External(cursor) => cursor.rollback(),
VirtualTableCursor::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support transactions".to_string(),
)),
}
}
pub(crate) fn commit(&self) -> crate::Result<()> {
match self {
VirtualTableCursor::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support transactions".to_string(),
)),
VirtualTableCursor::External(cursor) => cursor.commit(),
VirtualTableCursor::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support transactions".to_string(),
)),
}
}
pub(crate) fn rename(&self, new_name: &str) -> crate::Result<()> {
match self {
VirtualTableCursor::Pragma(_) => Err(LimboError::ExtensionError(
"Pragma virtual tables do not support renaming".to_string(),
)),
VirtualTableCursor::External(cursor) => cursor.rename(new_name),
VirtualTableCursor::Internal(_) => Err(LimboError::ExtensionError(
"Internal virtual tables currently do not support renaming".to_string(),
)),
} }
} }
} }
@@ -280,6 +298,7 @@ pub(crate) struct ExtVirtualTable {
implementation: Arc<VTabModuleImpl>, implementation: Arc<VTabModuleImpl>,
table_ptr: AtomicPtr<c_void>, table_ptr: AtomicPtr<c_void>,
} }
static VTAB_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
impl Clone for ExtVirtualTable { impl Clone for ExtVirtualTable {
fn clone(&self) -> Self { fn clone(&self) -> Self {
@@ -338,7 +357,7 @@ impl ExtVirtualTable {
/// Accepts a pointer connection that owns the VTable, that the module /// Accepts a pointer connection that owns the VTable, that the module
/// can optionally use to query the other tables. /// can optionally use to query the other tables.
fn open(&self, conn: Arc<Connection>) -> crate::Result<ExtVirtualTableCursor> { fn open(&self, conn: Arc<Connection>, id: u64) -> crate::Result<ExtVirtualTableCursor> {
// we need a Weak<Connection> to upgrade and call from the extension. // we need a Weak<Connection> to upgrade and call from the extension.
let weak = Arc::downgrade(&conn); let weak = Arc::downgrade(&conn);
let weak_box = Box::into_raw(Box::new(weak)); let weak_box = Box::into_raw(Box::new(weak));
@@ -357,7 +376,7 @@ impl ExtVirtualTable {
}) else { }) else {
return Err(LimboError::ExtensionError("Open returned null".to_string())); return Err(LimboError::ExtensionError("Open returned null".to_string()));
}; };
ExtVirtualTableCursor::new(cursor, ext_conn_ptr, self.implementation.clone()) ExtVirtualTableCursor::new(cursor, ext_conn_ptr, self.implementation.clone(), id)
} }
fn update(&self, args: &[Value]) -> crate::Result<Option<i64>> { fn update(&self, args: &[Value]) -> crate::Result<Option<i64>> {
@@ -395,6 +414,50 @@ impl ExtVirtualTable {
_ => Err(LimboError::ExtensionError(rc.to_string())), _ => Err(LimboError::ExtensionError(rc.to_string())),
} }
} }
fn commit(&self) -> crate::Result<()> {
let rc = unsafe {
(self.implementation.commit)(self.table_ptr.load(std::sync::atomic::Ordering::SeqCst))
};
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Commit failed".to_string())),
}
}
fn begin(&self) -> crate::Result<()> {
let rc = unsafe {
(self.implementation.begin)(self.table_ptr.load(std::sync::atomic::Ordering::SeqCst))
};
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Begin failed".to_string())),
}
}
fn rollback(&self) -> crate::Result<()> {
let rc = unsafe {
(self.implementation.rollback)(self.table_ptr.load(std::sync::atomic::Ordering::SeqCst))
};
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Rollback failed".to_string())),
}
}
fn rename(&self, new_name: &str) -> crate::Result<()> {
let c_new_name = std::ffi::CString::new(new_name).unwrap();
let rc = unsafe {
(self.implementation.rename)(
self.table_ptr.load(std::sync::atomic::Ordering::SeqCst),
c_new_name.as_ptr(),
)
};
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Rename failed".to_string())),
}
}
} }
pub struct ExtVirtualTableCursor { pub struct ExtVirtualTableCursor {
@@ -403,6 +466,7 @@ pub struct ExtVirtualTableCursor {
// query other internal tables. // query other internal tables.
conn_ptr: Option<NonNull<turso_ext::Conn>>, conn_ptr: Option<NonNull<turso_ext::Conn>>,
implementation: Arc<VTabModuleImpl>, implementation: Arc<VTabModuleImpl>,
vtab_id: u64,
} }
impl ExtVirtualTableCursor { impl ExtVirtualTableCursor {
@@ -410,11 +474,13 @@ impl ExtVirtualTableCursor {
cursor: NonNull<c_void>, cursor: NonNull<c_void>,
conn_ptr: NonNull<turso_ext::Conn>, conn_ptr: NonNull<turso_ext::Conn>,
implementation: Arc<VTabModuleImpl>, implementation: Arc<VTabModuleImpl>,
id: u64,
) -> crate::Result<Self> { ) -> crate::Result<Self> {
Ok(Self { Ok(Self {
cursor, cursor,
conn_ptr: Some(conn_ptr), conn_ptr: Some(conn_ptr),
implementation, implementation,
vtab_id: id,
}) })
} }
@@ -422,39 +488,6 @@ impl ExtVirtualTableCursor {
unsafe { (self.implementation.rowid)(self.cursor.as_ptr()) } unsafe { (self.implementation.rowid)(self.cursor.as_ptr()) }
} }
fn begin(&self) -> crate::Result<()> {
let rc = unsafe { (self.implementation.begin)(self.cursor.as_ptr()) };
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Begin failed".to_string())),
}
}
fn rollback(&self) -> crate::Result<()> {
let rc = unsafe { (self.implementation.rollback)(self.cursor.as_ptr()) };
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Rollback failed".to_string())),
}
}
fn commit(&self) -> crate::Result<()> {
let rc = unsafe { (self.implementation.commit)(self.cursor.as_ptr()) };
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Commit failed".to_string())),
}
}
fn rename(&self, new_name: &str) -> crate::Result<()> {
let c_new_name = std::ffi::CString::new(new_name).unwrap();
let rc = unsafe { (self.implementation.rename)(self.cursor.as_ptr(), c_new_name.as_ptr()) };
match rc {
ResultCode::OK => Ok(()),
_ => Err(LimboError::ExtensionError("Rename failed".to_string())),
}
}
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
fn filter( fn filter(
&self, &self,