Merge 'Initial pass on incremental view maintenance with DBSP' from Glauber Costa

Implement very basic views using DBSP
This is just the bare minimum that I needed to convince myself that this
 approach will work. The only views that we support are slices of the
 main table: no aggregations, no joins, no projections.
 * drop view is implemented.
 * view population is implemented.
 * deletes, inserts and updates are implemented.
 much like indexes before, a flag must be passed to enable views.

Closes #2530
This commit is contained in:
Pekka Enberg
2025-08-11 14:09:35 +03:00
committed by GitHub
25 changed files with 4662 additions and 74 deletions

View File

@@ -20,7 +20,7 @@ pub unsafe extern "C" fn db_open(path: *const c_char) -> *mut c_void {
}
let path = unsafe { std::ffi::CStr::from_ptr(path) };
let path = path.to_str().unwrap();
let Ok((io, conn)) = Connection::from_uri(path, false, false) else {
let Ok((io, conn)) = Connection::from_uri(path, false, false, false) else {
panic!("Failed to open connection with path: {path}");
};
LimboConn::new(conn, io).to_ptr()

View File

@@ -318,7 +318,7 @@ impl Drop for Connection {
#[pyfunction(signature = (path, experimental_indexes=None))]
pub fn connect(path: &str, experimental_indexes: Option<bool>) -> Result<Connection> {
let experimental_indexes = experimental_indexes.unwrap_or(true);
match turso_core::Connection::from_uri(path, experimental_indexes, false) {
match turso_core::Connection::from_uri(path, experimental_indexes, false, false) {
Ok((io, conn)) => Ok(Connection { conn, _io: io }),
Err(e) => Err(PyErr::new::<ProgrammingError, _>(format!(
"Failed to create connection: {e:?}"

View File

@@ -60,6 +60,8 @@ pub struct Opts {
pub readonly: bool,
#[clap(long, help = "Enable experimental MVCC feature")]
pub experimental_mvcc: bool,
#[clap(long, help = "Enable experimental views feature")]
pub experimental_views: bool,
#[clap(long, help = "Enable experimental indexing feature")]
pub experimental_indexes: Option<bool>,
#[clap(short = 't', long, help = "specify output file for log traces")]
@@ -121,7 +123,12 @@ impl Limbo {
.map_or(":memory:".to_string(), |p| p.to_string_lossy().to_string());
let indexes_enabled = opts.experimental_indexes.unwrap_or(true);
let (io, conn) = if db_file.contains([':', '?', '&', '#']) {
Connection::from_uri(&db_file, indexes_enabled, opts.experimental_mvcc)?
Connection::from_uri(
&db_file,
indexes_enabled,
opts.experimental_mvcc,
opts.experimental_views,
)?
} else {
let flags = if opts.readonly {
OpenFlags::ReadOnly
@@ -134,6 +141,7 @@ impl Limbo {
flags,
indexes_enabled,
opts.experimental_mvcc,
opts.experimental_views,
)?;
let conn = db.connect()?;
(io, conn)
@@ -951,7 +959,11 @@ impl Limbo {
}
fn print_schema_entry(&mut self, db_display_name: &str, row: &turso_core::Row) -> bool {
if let Ok(Value::Text(schema)) = row.get::<&Value>(0) {
if let (Ok(Value::Text(schema)), Ok(Value::Text(obj_type)), Ok(Value::Text(obj_name))) = (
row.get::<&Value>(0),
row.get::<&Value>(1),
row.get::<&Value>(2),
) {
let modified_schema = if db_display_name == "main" {
schema.as_str().to_string()
} else {
@@ -982,12 +994,64 @@ impl Limbo {
}
};
let _ = self.write_fmt(format_args!("{modified_schema};"));
// For views, add the column comment like SQLite does
if obj_type.as_str() == "view" {
let columns = self
.get_view_columns(obj_name.as_str())
.unwrap_or_else(|_| "x".to_string());
let _ = self.write_fmt(format_args!("/* {}({}) */", obj_name.as_str(), columns));
}
true
} else {
false
}
}
/// Get column names for a view to generate the SQLite-compatible comment
fn get_view_columns(&mut self, view_name: &str) -> anyhow::Result<String> {
// Get column information using PRAGMA table_info
let pragma_sql = format!("PRAGMA table_info({view_name})");
match self.conn.query(&pragma_sql) {
Ok(Some(ref mut rows)) => {
let mut columns = Vec::new();
loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
// Column name is in the second column (index 1) of PRAGMA table_info
if let Ok(Value::Text(col_name)) = row.get::<&Value>(1) {
columns.push(col_name.as_str().to_string());
}
}
StepResult::IO => {
rows.run_once()?;
}
StepResult::Done => break,
StepResult::Interrupt => break,
StepResult::Busy => break,
}
}
if columns.is_empty() {
anyhow::bail!("PRAGMA table_info returned no columns for view '{}'. The view may be corrupted or the database schema is invalid.", view_name);
}
Ok(columns.join(","))
}
Ok(None) => {
anyhow::bail!("PRAGMA table_info('{}') returned no results. The view may not exist or the database schema is invalid.", view_name);
}
Err(e) => {
anyhow::bail!(
"Failed to execute PRAGMA table_info for view '{}': {}",
view_name,
e
);
}
}
}
fn query_one_table_schema(
&mut self,
db_prefix: &str,
@@ -995,7 +1059,7 @@ impl Limbo {
table_name: &str,
) -> anyhow::Result<bool> {
let sql = format!(
"SELECT sql FROM {db_prefix}.sqlite_schema WHERE type IN ('table', 'index') AND tbl_name = '{table_name}' AND name NOT LIKE 'sqlite_%'"
"SELECT sql, type, name FROM {db_prefix}.sqlite_schema WHERE type IN ('table', 'index', 'view') AND (tbl_name = '{table_name}' OR name = '{table_name}') AND name NOT LIKE 'sqlite_%' ORDER BY CASE type WHEN 'table' THEN 1 WHEN 'view' THEN 2 WHEN 'index' THEN 3 END, rowid"
);
let mut found = false;
@@ -1028,9 +1092,7 @@ impl Limbo {
db_prefix: &str,
db_display_name: &str,
) -> anyhow::Result<()> {
let sql = format!(
"SELECT sql FROM {db_prefix}.sqlite_schema WHERE type IN ('table', 'index') AND name NOT LIKE 'sqlite_%'"
);
let sql = format!("SELECT sql, type, name FROM {db_prefix}.sqlite_schema WHERE type IN ('table', 'index', 'view') AND name NOT LIKE 'sqlite_%' ORDER BY CASE type WHEN 'table' THEN 1 WHEN 'view' THEN 2 WHEN 'index' THEN 3 END, rowid");
match self.conn.query(&sql) {
Ok(Some(ref mut rows)) => loop {

119
core/incremental/dbsp.rs Normal file
View File

@@ -0,0 +1,119 @@
// Simplified DBSP integration for incremental view maintenance
// For now, we'll use a basic approach and can expand to full DBSP later
use std::collections::HashMap;
/// A simplified ZSet for incremental computation
/// Each element has a weight: positive for additions, negative for deletions
#[derive(Clone, Debug, Default)]
pub struct SimpleZSet<T> {
data: HashMap<T, isize>,
}
impl<T: std::hash::Hash + Eq + Clone> SimpleZSet<T> {
pub fn new() -> Self {
Self {
data: HashMap::new(),
}
}
pub fn insert(&mut self, item: T, weight: isize) {
let current = self.data.get(&item).copied().unwrap_or(0);
let new_weight = current + weight;
if new_weight == 0 {
self.data.remove(&item);
} else {
self.data.insert(item, new_weight);
}
}
pub fn iter(&self) -> impl Iterator<Item = (&T, isize)> {
self.data.iter().map(|(k, &v)| (k, v))
}
/// Get all items with positive weights
pub fn to_vec(&self) -> Vec<T> {
self.data
.iter()
.filter(|(_, &weight)| weight > 0)
.map(|(item, _)| item.clone())
.collect()
}
pub fn merge(&mut self, other: &SimpleZSet<T>) {
for (item, weight) in other.iter() {
self.insert(item.clone(), weight);
}
}
}
/// A simplified stream for incremental computation
#[derive(Clone, Debug)]
pub struct SimpleStream<T> {
current: SimpleZSet<T>,
}
impl<T: std::hash::Hash + Eq + Clone> SimpleStream<T> {
pub fn from_zset(zset: SimpleZSet<T>) -> Self {
Self { current: zset }
}
/// Apply a delta (change) to the stream
pub fn apply_delta(&mut self, delta: &SimpleZSet<T>) {
self.current.merge(delta);
}
/// Get the current state as a vector of items (only positive weights)
pub fn to_vec(&self) -> Vec<T> {
self.current.to_vec()
}
}
// Type aliases for convenience
use super::hashable_row::HashableRow;
pub type RowKey = HashableRow;
pub type RowKeyZSet = SimpleZSet<RowKey>;
pub type RowKeyStream = SimpleStream<RowKey>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zset_merge_with_weights() {
let mut zset1 = SimpleZSet::new();
zset1.insert(1, 1); // Row 1 with weight +1
zset1.insert(2, 1); // Row 2 with weight +1
let mut zset2 = SimpleZSet::new();
zset2.insert(2, -1); // Row 2 with weight -1 (delete)
zset2.insert(3, 1); // Row 3 with weight +1 (insert)
zset1.merge(&zset2);
// Row 1: weight 1 (unchanged)
// Row 2: weight 0 (deleted)
// Row 3: weight 1 (inserted)
assert_eq!(zset1.iter().count(), 2); // Only rows 1 and 3
assert!(zset1.iter().any(|(k, _)| *k == 1));
assert!(zset1.iter().any(|(k, _)| *k == 3));
assert!(!zset1.iter().any(|(k, _)| *k == 2)); // Row 2 removed
}
#[test]
fn test_zset_represents_updates_as_delete_plus_insert() {
let mut zset = SimpleZSet::new();
// Initial state
zset.insert(1, 1);
// Update row 1: delete old + insert new
zset.insert(1, -1); // Delete old version
zset.insert(1, 1); // Insert new version
// Weight should be 1 (not 2)
let weight = zset.iter().find(|(k, _)| **k == 1).map(|(_, w)| w);
assert_eq!(weight, Some(1));
}
}

View File

@@ -0,0 +1,80 @@
use crate::types::Value;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// The DBSP paper uses as a key the whole record, with both the row key and the values. This is a
// bit confuses for us in databases, because when you say "key", it is easy to understand that as
// being the row key.
//
// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant
// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back.
//
// One of the situations in which using row keys completely breaks are table updates. If the "key"
// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k =
// 5, v = 5, and a view that filters v > 2.
//
// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we
// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta
// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1).
//
// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key ->
// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as
// I am sure it would break somewhere else.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HashableRow {
pub rowid: i64,
pub values: Vec<Value>,
// Pre-computed hash: DBSP rows are immutable and frequently hashed during joins,
// making caching worthwhile despite the memory overhead
cached_hash: u64,
}
impl HashableRow {
pub fn new(rowid: i64, values: Vec<Value>) -> Self {
let cached_hash = Self::compute_hash(rowid, &values);
Self {
rowid,
values,
cached_hash,
}
}
fn compute_hash(rowid: i64, values: &[Value]) -> u64 {
let mut hasher = DefaultHasher::new();
rowid.hash(&mut hasher);
for value in values {
match value {
Value::Null => {
0u8.hash(&mut hasher);
}
Value::Integer(i) => {
1u8.hash(&mut hasher);
i.hash(&mut hasher);
}
Value::Float(f) => {
2u8.hash(&mut hasher);
f.to_bits().hash(&mut hasher);
}
Value::Text(s) => {
3u8.hash(&mut hasher);
s.value.hash(&mut hasher);
(s.subtype as u8).hash(&mut hasher);
}
Value::Blob(b) => {
4u8.hash(&mut hasher);
b.hash(&mut hasher);
}
}
}
hasher.finish()
}
}
impl Hash for HashableRow {
fn hash<H: Hasher>(&self, state: &mut H) {
self.cached_hash.hash(state);
}
}

4
core/incremental/mod.rs Normal file
View File

@@ -0,0 +1,4 @@
pub mod dbsp;
pub mod hashable_row;
pub mod operator;
pub mod view;

2027
core/incremental/operator.rs Normal file

File diff suppressed because it is too large Load Diff

1209
core/incremental/view.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ mod ext;
mod fast_lock;
mod function;
mod functions;
mod incremental;
mod info;
mod io;
#[cfg(feature = "json")]
@@ -31,6 +32,7 @@ mod uuid;
mod vdbe;
mod vector;
mod vtab;
mod vtab_view;
#[cfg(feature = "fuzz")]
pub mod numeric;
@@ -38,6 +40,7 @@ pub mod numeric;
#[cfg(not(feature = "fuzz"))]
mod numeric;
use crate::incremental::view::ViewTransactionState;
use crate::translate::optimizer::optimize_plan;
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
@@ -129,6 +132,7 @@ pub struct Database {
init_lock: Arc<Mutex<()>>,
open_flags: OpenFlags,
builtin_syms: RefCell<SymbolTable>,
experimental_views: bool,
}
unsafe impl Send for Database {}
@@ -189,7 +193,14 @@ impl Database {
enable_mvcc: bool,
enable_indexes: bool,
) -> Result<Arc<Database>> {
Self::open_file_with_flags(io, path, OpenFlags::default(), enable_mvcc, enable_indexes)
Self::open_file_with_flags(
io,
path,
OpenFlags::default(),
enable_mvcc,
enable_indexes,
false,
)
}
#[cfg(feature = "fs")]
@@ -199,10 +210,19 @@ impl Database {
flags: OpenFlags,
enable_mvcc: bool,
enable_indexes: bool,
enable_views: bool,
) -> Result<Arc<Database>> {
let file = io.open_file(path, flags, true)?;
let db_file = Arc::new(DatabaseFile::new(file));
Self::open_with_flags(io, path, db_file, flags, enable_mvcc, enable_indexes)
Self::open_with_flags(
io,
path,
db_file,
flags,
enable_mvcc,
enable_indexes,
enable_views,
)
}
#[allow(clippy::arc_with_non_send_sync)]
@@ -220,6 +240,7 @@ impl Database {
OpenFlags::default(),
enable_mvcc,
enable_indexes,
false,
)
}
@@ -231,9 +252,18 @@ impl Database {
flags: OpenFlags,
enable_mvcc: bool,
enable_indexes: bool,
enable_views: bool,
) -> Result<Arc<Database>> {
if path == ":memory:" {
return Self::do_open_with_flags(io, path, db_file, flags, enable_mvcc, enable_indexes);
return Self::do_open_with_flags(
io,
path,
db_file,
flags,
enable_mvcc,
enable_indexes,
enable_views,
);
}
let mut registry = DATABASE_MANAGER.lock().unwrap();
@@ -246,7 +276,15 @@ impl Database {
if let Some(db) = registry.get(&canonical_path).and_then(Weak::upgrade) {
return Ok(db);
}
let db = Self::do_open_with_flags(io, path, db_file, flags, enable_mvcc, enable_indexes)?;
let db = Self::do_open_with_flags(
io,
path,
db_file,
flags,
enable_mvcc,
enable_indexes,
enable_views,
)?;
registry.insert(canonical_path, Arc::downgrade(&db));
Ok(db)
}
@@ -259,6 +297,7 @@ impl Database {
flags: OpenFlags,
enable_mvcc: bool,
enable_indexes: bool,
enable_views: bool,
) -> Result<Arc<Database>> {
let wal_path = format!("{path}-wal");
let maybe_shared_wal = WalFileShared::open_shared_if_exists(&io, wal_path.as_str())?;
@@ -293,6 +332,7 @@ impl Database {
open_flags: flags,
db_state: Arc::new(AtomicDbState::new(db_state)),
init_lock: Arc::new(Mutex::new(())),
experimental_views: enable_views,
});
db.register_global_builtin_extensions()
.expect("unable to register global extensions");
@@ -324,6 +364,13 @@ impl Database {
Ok(())
})?;
}
// FIXME: the correct way to do this is to just materialize the view.
// But this will allow us to keep going.
let conn = db.connect()?;
let pager = conn.pager.borrow().clone();
pager
.io
.block(|| conn.schema.borrow().populate_views(&conn))?;
Ok(db)
}
@@ -368,6 +415,7 @@ impl Database {
closed: Cell::new(false),
attached_databases: RefCell::new(DatabaseCatalog::new()),
query_only: Cell::new(false),
view_transaction_states: RefCell::new(HashMap::new()),
});
let builtin_syms = self.builtin_syms.borrow();
// add built-in extensions symbols to the connection to prevent having to load each time
@@ -464,6 +512,7 @@ impl Database {
flags: OpenFlags,
indexes: bool,
mvcc: bool,
views: bool,
) -> Result<(Arc<dyn IO>, Arc<Database>)>
where
S: AsRef<str> + std::fmt::Display,
@@ -490,7 +539,7 @@ impl Database {
}
},
};
let db = Self::open_file_with_flags(io.clone(), path, flags, mvcc, indexes)?;
let db = Self::open_file_with_flags(io.clone(), path, flags, mvcc, indexes, views)?;
Ok((io, db))
}
None => {
@@ -498,7 +547,7 @@ impl Database {
MEMORY_PATH => Arc::new(MemoryIO::new()),
_ => Arc::new(PlatformIO::new()?),
};
let db = Self::open_file_with_flags(io.clone(), path, flags, mvcc, indexes)?;
let db = Self::open_file_with_flags(io.clone(), path, flags, mvcc, indexes, views)?;
Ok((io, db))
}
}
@@ -537,6 +586,10 @@ impl Database {
pub fn get_mv_store(&self) -> Option<&Arc<MvStore>> {
self.mv_store.as_ref()
}
pub fn experimental_views_enabled(&self) -> bool {
self.experimental_views
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
@@ -729,6 +782,10 @@ pub struct Connection {
/// Attached databases
attached_databases: RefCell<DatabaseCatalog>,
query_only: Cell<bool>,
/// Per-connection view transaction states for uncommitted changes. This represents
/// one entry per view that was touched in the transaction.
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
}
impl Connection {
@@ -831,6 +888,10 @@ impl Connection {
let mut fresh = Schema::new(self.schema.borrow().indexes_enabled);
fresh.schema_version = cookie;
// Preserve existing views to avoid expensive repopulation.
// TODO: We may not need to do this if we materialize our views.
let existing_views = self.schema.borrow().views.clone();
// TODO: this is hack to avoid a cyclical problem with schema reprepare
// The problem here is that we prepare a statement here, but when the statement tries
// to execute it, it first checks the schema cookie to see if it needs to reprepare the statement.
@@ -843,11 +904,16 @@ impl Connection {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, &mut fresh, &self.syms.borrow(), None)?;
parse_schema_rows(stmt, &mut fresh, &self.syms.borrow(), None, existing_views)?;
self.with_schema_mut(|schema| {
*schema = fresh;
});
{
let schema = self.schema.borrow();
pager.io.block(|| schema.populate_views(self))?;
}
Result::Ok(())
};
@@ -1073,18 +1139,32 @@ impl Connection {
uri: &str,
use_indexes: bool,
mvcc: bool,
views: bool,
) -> Result<(Arc<dyn IO>, Arc<Connection>)> {
use crate::util::MEMORY_PATH;
let opts = OpenOptions::parse(uri)?;
let flags = opts.get_flags()?;
if opts.path == MEMORY_PATH || matches!(opts.mode, OpenMode::Memory) {
let io = Arc::new(MemoryIO::new());
let db =
Database::open_file_with_flags(io.clone(), MEMORY_PATH, flags, mvcc, use_indexes)?;
let db = Database::open_file_with_flags(
io.clone(),
MEMORY_PATH,
flags,
mvcc,
use_indexes,
views,
)?;
let conn = db.connect()?;
return Ok((io, conn));
}
let (io, db) = Database::open_new(&opts.path, opts.vfs.as_ref(), flags, use_indexes, mvcc)?;
let (io, db) = Database::open_new(
&opts.path,
opts.vfs.as_ref(),
flags,
use_indexes,
mvcc,
views,
)?;
if let Some(modeof) = opts.modeof {
let perms = std::fs::metadata(modeof)?;
std::fs::set_permissions(&opts.path, perms.permissions())?;
@@ -1094,13 +1174,24 @@ impl Connection {
}
#[cfg(feature = "fs")]
fn from_uri_attached(uri: &str, use_indexes: bool, use_mvcc: bool) -> Result<Arc<Database>> {
fn from_uri_attached(
uri: &str,
use_indexes: bool,
use_mvcc: bool,
use_views: bool,
) -> Result<Arc<Database>> {
let mut opts = OpenOptions::parse(uri)?;
// FIXME: for now, only support read only attach
opts.mode = OpenMode::ReadOnly;
let flags = opts.get_flags()?;
let (_io, db) =
Database::open_new(&opts.path, opts.vfs.as_ref(), flags, use_indexes, use_mvcc)?;
let (_io, db) = Database::open_new(
&opts.path,
opts.vfs.as_ref(),
flags,
use_indexes,
use_mvcc,
use_views,
)?;
if let Some(modeof) = opts.modeof {
let perms = std::fs::metadata(modeof)?;
std::fs::set_permissions(&opts.path, perms.permissions())?;
@@ -1422,7 +1513,9 @@ impl Connection {
.expect("query must be parsed to statement");
let syms = self.syms.borrow();
self.with_schema_mut(|schema| {
if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None)
let existing_views = schema.views.clone();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, schema, &syms, None, existing_views)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
@@ -1488,6 +1581,10 @@ impl Connection {
Ok(results)
}
pub fn experimental_views_enabled(&self) -> bool {
self._db.experimental_views_enabled()
}
/// Query the current value(s) of `pragma_name` associated to
/// `pragma_value`.
///
@@ -1583,8 +1680,9 @@ impl Connection {
.map_err(|_| LimboError::SchemaLocked)?
.indexes_enabled();
let use_mvcc = self._db.mv_store.is_some();
let use_views = self._db.experimental_views_enabled();
let db = Self::from_uri_attached(path, use_indexes, use_mvcc)?;
let db = Self::from_uri_attached(path, use_indexes, use_mvcc, use_views)?;
let pager = Rc::new(db.init_pager(None)?);
self.attached_databases

View File

@@ -1,3 +1,8 @@
use crate::incremental::view::IncrementalView;
use crate::types::IOResult;
/// Type alias for the views collection
pub type ViewsMap = HashMap<String, Arc<Mutex<IncrementalView>>>;
use crate::result::LimboResult;
use crate::storage::btree::BTreeCursor;
use crate::translate::collate::CollationSeq;
@@ -13,6 +18,7 @@ use std::collections::{BTreeSet, HashMap};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::trace;
use turso_sqlite3_parser::ast::{self, ColumnDefinition, Expr, Literal, SortOrder, TableOptions};
use turso_sqlite3_parser::{
@@ -26,11 +32,16 @@ const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
#[derive(Debug)]
pub struct Schema {
pub tables: HashMap<String, Arc<Table>>,
pub views: ViewsMap,
/// table_name to list of indexes for the table
pub indexes: HashMap<String, Vec<Arc<Index>>>,
pub has_indexes: std::collections::HashSet<String>,
pub indexes_enabled: bool,
pub schema_version: u32,
/// Mapping from table names to the views that depend on them
pub table_to_views: HashMap<String, Vec<String>>,
}
impl Schema {
@@ -49,12 +60,16 @@ impl Schema {
Arc::new(Table::Virtual(Arc::new((*function).clone()))),
);
}
let views: ViewsMap = HashMap::new();
let table_to_views: HashMap<String, Vec<String>> = HashMap::new();
Self {
tables,
views,
indexes,
has_indexes,
indexes_enabled,
schema_version: 0,
table_to_views,
}
}
@@ -64,6 +79,68 @@ impl Schema {
.iter()
.any(|idx| idx.1.iter().any(|i| i.name == name))
}
pub fn add_view(&mut self, view: IncrementalView) {
let name = normalize_ident(view.name());
self.views.insert(name, Arc::new(Mutex::new(view)));
}
pub fn get_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
let name = normalize_ident(name);
self.views.get(&name).cloned()
}
pub fn remove_view(&mut self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
let name = normalize_ident(name);
// Remove from table_to_views dependencies
for views in self.table_to_views.values_mut() {
views.retain(|v| v != &name);
}
// Remove the view itself
self.views.remove(&name)
}
/// Register that a view depends on a table
pub fn add_view_dependency(&mut self, table_name: &str, view_name: &str) {
let table_name = normalize_ident(table_name);
let view_name = normalize_ident(view_name);
self.table_to_views
.entry(table_name)
.or_default()
.push(view_name);
}
/// Get all views that depend on a given table
pub fn get_dependent_views(&self, table_name: &str) -> Vec<String> {
let table_name = normalize_ident(table_name);
self.table_to_views
.get(&table_name)
.cloned()
.unwrap_or_default()
}
/// Populate all views by scanning their source tables
/// Returns IOResult to support async execution
pub fn populate_views(&self, conn: &Arc<crate::Connection>) -> Result<IOResult<()>> {
for view in self.views.values() {
let mut view = view
.lock()
.map_err(|_| LimboError::InternalError("Failed to lock view".to_string()))?;
match view.populate_from_table(conn)? {
IOResult::Done(()) => {
// This view is done, continue to next
continue;
}
IOResult::IO => {
// This view needs more IO, return early
return Ok(IOResult::IO);
}
}
}
Ok(IOResult::Done(()))
}
pub fn add_btree_table(&mut self, table: Arc<BTreeTable>) {
let name = normalize_ident(&table.name);
@@ -160,6 +237,9 @@ impl Schema {
let mut automatic_indices: HashMap<String, Vec<(String, usize)>> =
HashMap::with_capacity(10);
// Collect views for second pass to populate table_to_views mapping
let mut views_to_process: Vec<(String, Vec<String>)> = Vec::new();
if matches!(pager.begin_read_tx()?, LimboResult::Busy) {
return Err(LimboError::Busy);
}
@@ -271,6 +351,35 @@ impl Schema {
}
}
}
"view" => {
let name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(name_text) = name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name_text.as_str();
let sql_value = record_cursor.get_value(&row, 4)?;
let RefValue::Text(sql_text) = sql_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let sql = sql_text.as_str();
// Create IncrementalView directly
if let Ok(incremental_view) = IncrementalView::from_sql(sql, self) {
// Get referenced table names before moving the view
let referenced_tables = incremental_view.get_referenced_table_names();
let view_name = name.to_string();
// Add to schema (moves incremental_view)
self.add_view(incremental_view);
// Store for second pass processing
views_to_process.push((view_name, referenced_tables));
} else {
eprintln!("Warning: Could not create incremental view for: {name}");
}
}
_ => {}
};
drop(record_cursor);
@@ -281,6 +390,14 @@ impl Schema {
pager.end_read_tx()?;
// Second pass: populate table_to_views mapping
for (view_name, referenced_tables) in views_to_process {
// Register this view as dependent on each referenced table
for table_name in referenced_tables {
self.add_view_dependency(&table_name, &view_name);
}
}
for unparsed_sql_from_index in from_sql_indexes {
if !self.indexes_enabled() {
self.table_set_has_index(&unparsed_sql_from_index.table_name);
@@ -357,12 +474,19 @@ impl Clone for Schema {
(name.clone(), indexes)
})
.collect();
let views = self
.views
.iter()
.map(|(name, view)| (name.clone(), view.clone()))
.collect();
Self {
tables,
views,
indexes,
has_indexes: self.has_indexes.clone(),
indexes_enabled: self.indexes_enabled,
schema_version: self.schema_version,
table_to_views: self.table_to_views.clone(),
}
}
}

View File

@@ -1189,9 +1189,9 @@ fn emit_update_insns(
flag: if has_user_provided_rowid {
// The previous Insn::NotExists and Insn::Delete seek to the old rowid,
// so to insert a new user-provided rowid, we need to seek to the correct place.
InsertFlags::new().require_seek()
InsertFlags::new().require_seek().update()
} else {
InsertFlags::new()
InsertFlags::new().update()
},
table_name: table_ref.identifier.clone(),
});

View File

@@ -34,6 +34,7 @@ pub(crate) mod subquery;
pub(crate) mod transaction;
pub(crate) mod update;
mod values;
pub(crate) mod view;
use crate::schema::Schema;
use crate::storage::pager::Pager;
@@ -127,6 +128,7 @@ pub fn translate_inner(
| ast::Stmt::Delete(..)
| ast::Stmt::DropIndex { .. }
| ast::Stmt::DropTable { .. }
| ast::Stmt::DropView { .. }
| ast::Stmt::Reindex { .. }
| ast::Stmt::Update(..)
| ast::Stmt::Insert(..)
@@ -186,7 +188,16 @@ pub fn translate_inner(
program,
)?,
ast::Stmt::CreateTrigger { .. } => bail_parse_error!("CREATE TRIGGER not supported yet"),
ast::Stmt::CreateView { .. } => bail_parse_error!("CREATE VIEW not supported yet"),
ast::Stmt::CreateView {
view_name, select, ..
} => view::translate_create_view(
schema,
view_name.name.as_str(),
&select,
connection.clone(),
syms,
program,
)?,
ast::Stmt::CreateVirtualTable(vtab) => {
translate_create_virtual_table(*vtab, schema, syms, program)?
}
@@ -230,7 +241,16 @@ pub fn translate_inner(
tbl_name,
} => translate_drop_table(tbl_name, if_exists, schema, syms, program)?,
ast::Stmt::DropTrigger { .. } => bail_parse_error!("DROP TRIGGER not supported yet"),
ast::Stmt::DropView { .. } => bail_parse_error!("DROP VIEW not supported yet"),
ast::Stmt::DropView {
if_exists,
view_name,
} => view::translate_drop_view(
schema,
view_name.name.as_str(),
if_exists,
connection.clone(),
program,
)?,
ast::Stmt::Pragma(..) => {
bail_parse_error!("PRAGMA statement cannot be evaluated in a nested context")
}

View File

@@ -4,8 +4,8 @@ use super::{
expr::walk_expr,
plan::{
Aggregate, ColumnUsedMask, Distinctness, EvalAt, JoinInfo, JoinOrderMember, JoinedTable,
Operation, OuterQueryReference, Plan, QueryDestination, ResultSetColumn, TableReferences,
WhereTerm,
Operation, OuterQueryReference, Plan, QueryDestination, ResultSetColumn, Scan,
TableReferences, WhereTerm,
},
select::prepare_select_plan,
SymbolTable,
@@ -470,6 +470,35 @@ fn parse_table(
return Ok(());
};
let view = connection.with_schema(database_id, |schema| schema.get_view(table_name.as_str()));
if let Some(view) = view {
// Create a virtual table wrapper for the view
// We'll use the view's columns from the schema
let vtab = crate::vtab_view::create_view_virtual_table(table_name.as_str(), view.clone())?;
let alias = maybe_alias
.map(|a| match a {
ast::As::As(id) => id,
ast::As::Elided(id) => id,
})
.map(|a| a.as_str().to_string());
table_references.add_joined_table(JoinedTable {
op: Operation::Scan(Scan::VirtualTable {
idx_num: -1,
idx_str: None,
constraints: Vec::new(),
}),
table: Table::Virtual(vtab),
identifier: alias.unwrap_or(normalized_qualified_name),
internal_id: table_ref_counter.next(),
join_info: None,
col_used_mask: ColumnUsedMask::default(),
database_id,
});
return Ok(());
}
// CTEs are transformed into FROM clause subqueries.
// If we find a CTE with this name in our outer query references,
// we can use it as a joined table, but we must clone it since it's not MATERIALIZED.

View File

@@ -419,46 +419,19 @@ fn query_pragma(
Ok((program, TransactionMode::Read))
}
PragmaName::TableInfo => {
let table = match value {
Some(ast::Expr::Name(name)) => {
let tbl = normalize_ident(name.as_str());
schema.get_table(&tbl)
}
let name = match value {
Some(ast::Expr::Name(name)) => Some(normalize_ident(name.as_str())),
_ => None,
};
let base_reg = register;
program.alloc_registers(5);
if let Some(table) = table {
// According to the SQLite documentation: "The 'cid' column should not be taken to
// mean more than 'rank within the current result set'."
// Therefore, we enumerate only after filtering out hidden columns.
for (i, column) in table.columns().iter().filter(|col| !col.hidden).enumerate() {
// cid
program.emit_int(i as i64, base_reg);
// name
program.emit_string8(column.name.clone().unwrap_or_default(), base_reg + 1);
// type
program.emit_string8(column.ty_str.clone(), base_reg + 2);
// notnull
program.emit_bool(column.notnull, base_reg + 3);
// dflt_value
match &column.default {
None => {
program.emit_null(base_reg + 4, None);
}
Some(expr) => {
program.emit_string8(expr.to_string(), base_reg + 4);
}
}
// pk
program.emit_bool(column.primary_key, base_reg + 5);
program.emit_result_row(base_reg, 6);
if let Some(name) = name {
if let Some(table) = schema.get_table(&name) {
emit_columns_for_table_info(&mut program, table.columns(), base_reg);
} else if let Some(view_mutex) = schema.get_view(&name) {
let view = view_mutex.lock().unwrap();
emit_columns_for_table_info(&mut program, &view.columns, base_reg);
}
}
let col_names = ["cid", "name", "type", "notnull", "dflt_value", "pk"];
@@ -565,6 +538,45 @@ fn query_pragma(
}
}
/// Helper function to emit column information for PRAGMA table_info
/// Used by both tables and views since they now have the same column emission logic
fn emit_columns_for_table_info(
program: &mut ProgramBuilder,
columns: &[crate::schema::Column],
base_reg: usize,
) {
// According to the SQLite documentation: "The 'cid' column should not be taken to
// mean more than 'rank within the current result set'."
// Therefore, we enumerate only after filtering out hidden columns.
for (i, column) in columns.iter().filter(|col| !col.hidden).enumerate() {
// cid
program.emit_int(i as i64, base_reg);
// name
program.emit_string8(column.name.clone().unwrap_or_default(), base_reg + 1);
// type
program.emit_string8(column.ty_str.clone(), base_reg + 2);
// notnull
program.emit_bool(column.notnull, base_reg + 3);
// dflt_value
match &column.default {
None => {
program.emit_null(base_reg + 4, None);
}
Some(expr) => {
program.emit_string8(expr.to_string(), base_reg + 4);
}
}
// pk
program.emit_bool(column.primary_key, base_reg + 5);
program.emit_result_row(base_reg, 6);
}
}
fn update_auto_vacuum_mode(
auto_vacuum_mode: AutoVacuumMode,
largest_root_page_number: u32,

View File

@@ -182,6 +182,7 @@ pub fn translate_create_table(
pub enum SchemaEntryType {
Table,
Index,
View,
}
impl SchemaEntryType {
@@ -189,6 +190,7 @@ impl SchemaEntryType {
match self {
SchemaEntryType::Table => "table",
SchemaEntryType::Index => "index",
SchemaEntryType::View => "view",
}
}
}

221
core/translate/view.rs Normal file
View File

@@ -0,0 +1,221 @@
use crate::schema::Schema;
use crate::translate::emitter::Resolver;
use crate::translate::schema::{emit_schema_entry, SchemaEntryType, SQLITE_TABLEID};
use crate::util::normalize_ident;
use crate::vdbe::builder::{CursorType, ProgramBuilder};
use crate::vdbe::insn::{CmpInsFlags, Cookie, Insn};
use crate::{Connection, Result, SymbolTable};
use std::sync::Arc;
use turso_sqlite3_parser::ast::{self, fmt::ToTokens};
pub fn translate_create_view(
schema: &Schema,
view_name: &str,
select_stmt: &ast::Select,
connection: Arc<Connection>,
syms: &SymbolTable,
mut program: ProgramBuilder,
) -> Result<ProgramBuilder> {
// Check if experimental views are enabled
if !connection.experimental_views_enabled() {
return Err(crate::LimboError::ParseError(
"CREATE VIEW is an experimental feature. Enable with --experimental-views flag"
.to_string(),
));
}
let normalized_view_name = normalize_ident(view_name);
// Check if view already exists
if schema.get_view(&normalized_view_name).is_some() {
return Err(crate::LimboError::ParseError(format!(
"View {normalized_view_name} already exists"
)));
}
// Validate that this view can be created as an IncrementalView
// This validation happens before updating sqlite_master to prevent
// storing invalid view definitions
use crate::incremental::view::IncrementalView;
IncrementalView::can_create_view(select_stmt, schema)?;
// Reconstruct the SQL string
let sql = create_view_to_str(view_name, select_stmt);
// Open cursor to sqlite_schema table
let table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
db: 0,
});
// Add the view entry to sqlite_schema
let resolver = Resolver::new(schema, syms);
emit_schema_entry(
&mut program,
&resolver,
sqlite_schema_cursor_id,
None, // cdc_table_cursor_id, no cdc for views
SchemaEntryType::View,
&normalized_view_name,
&normalized_view_name, // for views, tbl_name is same as name
0, // views don't have a root page
Some(sql.clone()),
)?;
// Parse schema to load the new view
program.emit_insn(Insn::ParseSchema {
db: sqlite_schema_cursor_id,
where_clause: Some(format!("name = '{normalized_view_name}'")),
});
// Populate the new view
program.emit_insn(Insn::PopulateViews);
program.epilogue(schema);
Ok(program)
}
fn create_view_to_str(view_name: &str, select_stmt: &ast::Select) -> String {
format!(
"CREATE VIEW {} AS {}",
view_name,
select_stmt.format().unwrap()
)
}
pub fn translate_drop_view(
schema: &Schema,
view_name: &str,
if_exists: bool,
connection: Arc<Connection>,
mut program: ProgramBuilder,
) -> Result<ProgramBuilder> {
// Check if experimental views are enabled
if !connection.experimental_views_enabled() {
return Err(crate::LimboError::ParseError(
"DROP VIEW is an experimental feature. Enable with --experimental-views flag"
.to_string(),
));
}
let normalized_view_name = normalize_ident(view_name);
// Check if view exists
let view_exists = schema.get_view(&normalized_view_name).is_some();
if !view_exists && !if_exists {
return Err(crate::LimboError::ParseError(format!(
"no such view: {normalized_view_name}"
)));
}
if !view_exists && if_exists {
// View doesn't exist but IF EXISTS was specified, nothing to do
return Ok(program);
}
// Open cursor to sqlite_schema table
let schema_table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
let sqlite_schema_cursor_id =
program.alloc_cursor_id(CursorType::BTreeTable(schema_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: sqlite_schema_cursor_id,
root_page: 1usize.into(),
db: 0,
});
// Allocate registers for searching
let view_name_reg = program.alloc_register();
let type_reg = program.alloc_register();
let rowid_reg = program.alloc_register();
// Set the view name and type we're looking for
program.emit_insn(Insn::String8 {
dest: view_name_reg,
value: normalized_view_name.clone(),
});
program.emit_insn(Insn::String8 {
dest: type_reg,
value: "view".to_string(),
});
// Start scanning from the beginning
let end_loop_label = program.allocate_label();
let loop_start_label = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: sqlite_schema_cursor_id,
pc_if_empty: end_loop_label,
});
program.preassign_label_to_next_insn(loop_start_label);
// Check if this row is the view we're looking for
// Column 0 is type, Column 1 is name, Column 2 is tbl_name
let col0_reg = program.alloc_register();
let col1_reg = program.alloc_register();
program.emit_column(sqlite_schema_cursor_id, 0, col0_reg);
program.emit_column(sqlite_schema_cursor_id, 1, col1_reg);
// Check if type == 'view' and name == view_name
let skip_delete_label = program.allocate_label();
program.emit_insn(Insn::Ne {
lhs: col0_reg,
rhs: type_reg,
target_pc: skip_delete_label,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
program.emit_insn(Insn::Ne {
lhs: col1_reg,
rhs: view_name_reg,
target_pc: skip_delete_label,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
// Get the rowid and delete this row
program.emit_insn(Insn::RowId {
cursor_id: sqlite_schema_cursor_id,
dest: rowid_reg,
});
program.emit_insn(Insn::Delete {
cursor_id: sqlite_schema_cursor_id,
table_name: "sqlite_schema".to_string(),
});
program.resolve_label(skip_delete_label, program.offset());
// Move to next row
program.emit_insn(Insn::Next {
cursor_id: sqlite_schema_cursor_id,
pc_if_next: loop_start_label,
});
program.preassign_label_to_next_insn(end_loop_label);
// Remove the view from the in-memory schema
program.emit_insn(Insn::DropView {
db: 0,
view_name: normalized_view_name.clone(),
});
// Update schema version (increment schema cookie)
let schema_version_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
dest: schema_version_reg,
value: (schema.schema_version + 1) as i64,
});
program.emit_insn(Insn::SetCookie {
db: 0,
cookie: Cookie::SchemaVersion,
value: (schema.schema_version + 1) as i32,
p5: 1, // update version
});
program.epilogue(schema);
Ok(program)
}

View File

@@ -2,17 +2,21 @@
use crate::translate::expr::WalkControl;
use crate::types::IOResult;
use crate::{
schema::{self, Column, Schema, Type},
schema::{self, Column, Schema, Type, ViewsMap},
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
types::{Value, ValueType},
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
};
use crate::{Connection, IO};
use std::{rc::Rc, sync::Arc};
use std::{
rc::Rc,
sync::{Arc, Mutex},
};
use tracing::{instrument, Level};
use turso_sqlite3_parser::ast::{
self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator,
self, fmt::ToTokens, Cmd, CreateTableBody, Expr, FunctionTail, Literal, Stmt, UnaryOperator,
};
use turso_sqlite3_parser::lexer::sql::Parser;
pub trait IOExt {
fn block<T>(&self, f: impl FnMut() -> Result<IOResult<T>>) -> Result<T>;
@@ -73,18 +77,22 @@ pub fn parse_schema_rows(
schema: &mut Schema,
syms: &SymbolTable,
mv_tx_id: Option<u64>,
mut existing_views: ViewsMap,
) -> Result<()> {
rows.set_mv_tx_id(mv_tx_id);
// TODO: if we IO, this unparsed indexes is lost. Will probably need some state between
// IO runs
let mut from_sql_indexes = Vec::with_capacity(10);
let mut automatic_indices = std::collections::HashMap::with_capacity(10);
// Collect views for second pass to populate table_to_views mapping
let mut views_to_process: Vec<(String, Vec<String>)> = Vec::new();
loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let ty = row.get::<&str>(0)?;
if !["table", "index"].contains(&ty) {
if !["table", "index", "view"].contains(&ty) {
continue;
}
match ty {
@@ -141,6 +149,65 @@ pub fn parse_schema_rows(
}
}
}
"view" => {
use crate::incremental::view::IncrementalView;
use fallible_iterator::FallibleIterator;
let name: &str = row.get::<&str>(1)?;
let sql: &str = row.get::<&str>(4)?;
let view_name = name.to_string();
// Try to remove and potentially reuse an existing view with this name.
// Note: After this function completes, any views not reused are discarded,
// as they are no longer relevant in the new schema.
let should_create_new =
if let Some(existing_view) = existing_views.remove(&view_name) {
// Check if we can reuse this view (same SQL definition)
let can_reuse = if let Ok(view_guard) = existing_view.lock() {
view_guard.has_same_sql(sql)
} else {
false
};
if can_reuse {
// Reuse the existing view - it's already populated!
let referenced_tables =
if let Ok(view_guard) = existing_view.lock() {
view_guard.get_referenced_table_names()
} else {
vec![]
};
// Add the existing view to the new schema
schema.views.insert(view_name.clone(), existing_view);
// Store for second pass processing
views_to_process.push((view_name.clone(), referenced_tables));
false // Don't create new
} else {
true // SQL changed, need to create new
}
} else {
true // No existing view, need to create new
};
if should_create_new {
// Create a new IncrementalView
match IncrementalView::from_sql(sql, schema) {
Ok(incremental_view) => {
let referenced_tables =
incremental_view.get_referenced_table_names();
schema.add_view(incremental_view);
views_to_process.push((view_name, referenced_tables));
}
Err(e) => {
eprintln!(
"Warning: Could not create incremental view for {name}: {e:?}"
);
}
}
}
}
_ => continue,
}
}
@@ -183,6 +250,15 @@ pub fn parse_schema_rows(
}
}
}
// Second pass: populate table_to_views mapping
for (view_name, referenced_tables) in views_to_process {
// Register this view as dependent on each referenced table
for table_name in referenced_tables {
schema.add_view_dependency(&table_name, &view_name);
}
}
Ok(())
}
@@ -1094,6 +1170,151 @@ pub fn parse_pragma_bool(expr: &Expr) -> Result<bool> {
))
}
/// Extract column name from an expression (e.g., for SELECT clauses)
pub fn extract_column_name_from_expr(expr: &ast::Expr) -> Option<String> {
match expr {
ast::Expr::Id(name) => Some(name.as_str().to_string()),
ast::Expr::Qualified(_, name) => Some(name.as_str().to_string()),
_ => None,
}
}
/// Extract column information from a SELECT statement for view creation
pub fn extract_view_columns(select_stmt: &ast::Select, schema: &Schema) -> Vec<Column> {
let mut columns = Vec::new();
// Navigate to the first SELECT in the statement
if let ast::OneSelect::Select(select_core) = select_stmt.body.select.as_ref() {
// First, we need to figure out which table(s) are being selected from
let table_name = if let Some(from) = &select_core.from {
if let Some(ast::SelectTable::Table(qualified_name, _, _)) = from.select.as_deref() {
Some(normalize_ident(qualified_name.name.as_str()))
} else {
None
}
} else {
None
};
// Get the table for column resolution
let _table = table_name.as_ref().and_then(|name| schema.get_table(name));
// Process each column in the SELECT list
for (i, result_col) in select_core.columns.iter().enumerate() {
match result_col {
ast::ResultColumn::Expr(expr, alias) => {
let name = alias
.as_ref()
.map(|a| match a {
ast::As::Elided(name) => name.as_str().to_string(),
ast::As::As(name) => name.as_str().to_string(),
})
.or_else(|| extract_column_name_from_expr(expr))
.unwrap_or_else(|| {
// If we can't extract a simple column name, use the expression itself
expr.format().unwrap_or_else(|_| format!("column_{i}"))
});
columns.push(Column {
name: Some(name),
ty: Type::Text, // Default to TEXT, could be refined with type analysis
ty_str: "TEXT".to_string(),
primary_key: false, // Views don't have primary keys
is_rowid_alias: false,
notnull: false, // Views typically don't enforce NOT NULL
default: None, // Views don't have default values
unique: false,
collation: None,
hidden: false,
});
}
ast::ResultColumn::Star => {
// For SELECT *, expand to all columns from the table
if let Some(ref table_name) = table_name {
if let Some(table) = schema.get_table(table_name) {
// Copy all columns from the table, but adjust for view constraints
for table_column in table.columns() {
columns.push(Column {
name: table_column.name.clone(),
ty: table_column.ty,
ty_str: table_column.ty_str.clone(),
primary_key: false, // Views don't have primary keys
is_rowid_alias: false,
notnull: false, // Views typically don't enforce NOT NULL
default: None, // Views don't have default values
unique: false,
collation: table_column.collation,
hidden: false,
});
}
} else {
// Table not found, create placeholder
columns.push(Column {
name: Some("*".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
});
}
} else {
// No FROM clause or couldn't determine table, create placeholder
columns.push(Column {
name: Some("*".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
});
}
}
ast::ResultColumn::TableStar(table_name) => {
// For table.*, expand to all columns from the specified table
let table_name_str = normalize_ident(table_name.as_str());
if let Some(table) = schema.get_table(&table_name_str) {
// Copy all columns from the table, but adjust for view constraints
for table_column in table.columns() {
columns.push(Column {
name: table_column.name.clone(),
ty: table_column.ty,
ty_str: table_column.ty_str.clone(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: table_column.collation,
hidden: false,
});
}
} else {
// Table not found, create placeholder
columns.push(Column {
name: Some(format!("{table_name_str}.*")),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
});
}
}
}
}
}
columns
}
#[cfg(test)]
pub mod tests {
use super::*;

View File

@@ -5120,7 +5120,7 @@ pub fn op_insert(
key_reg,
record_reg,
flag,
table_name: _,
table_name,
},
insn
);
@@ -5163,6 +5163,72 @@ pub fn op_insert(
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
};
// Update dependent views for incremental computation
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
if !dependent_views.is_empty() {
// If this is an UPDATE operation, first capture and delete the old row data
if flag.has(InsertFlags::UPDATE) {
// Get the old record before it's overwritten
let old_record_values = if let Some(old_record) = return_if_io!(cursor.record()) {
let mut values = old_record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
let schema = program.connection.schema.borrow();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < values.len() {
values[i] = Value::Integer(key);
}
}
}
drop(schema);
Some(values)
} else {
None
};
// Add deletion of old row to view deltas
if let Some(old_values) = old_record_values {
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
for view_name in &dependent_views {
let tx_state = tx_states.entry(view_name.clone()).or_default();
tx_state.delta.delete(key, old_values.clone());
}
}
}
// Add insertion of new row to view deltas
let mut new_values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
let schema = program.connection.schema.borrow();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < new_values.len() {
new_values[i] = Value::Integer(key);
}
}
}
drop(schema);
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
for view_name in dependent_views {
let tx_state = tx_states.entry(view_name.clone()).or_default();
tx_state.delta.insert(key, new_values.clone());
}
}
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
@@ -5219,14 +5285,63 @@ pub fn op_delete(
load_insn!(
Delete {
cursor_id,
table_name: _
table_name
},
insn
);
{
// Capture row data before deletion for view updates
let record_key_and_values = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
let result = if !dependent_views.is_empty() {
// Get the current key
let maybe_key = return_if_io!(cursor.rowid());
let key = maybe_key.ok_or_else(|| {
LimboError::InternalError("Cannot delete: no current row".to_string())
})?;
// Get the current record before deletion and extract values
if let Some(record) = return_if_io!(cursor.record()) {
let mut values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < values.len() {
values[i] = Value::Integer(key);
}
}
}
Some((key, values))
} else {
None
}
} else {
None
};
// Now perform the deletion
return_if_io!(cursor.delete());
result
};
// Update dependent views for incremental computation
if let Some((key, values)) = record_key_and_values {
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
for view_name in dependent_views {
let tx_state = tx_states.entry(view_name).or_default();
tx_state.delta.delete(key, values.clone());
}
}
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
@@ -6039,6 +6154,26 @@ pub fn op_drop_table(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_drop_view(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
_pager: &Rc<Pager>,
_mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(DropView { db, view_name }, insn);
if *db > 0 {
todo!("temp databases not implemented yet");
}
let conn = program.connection.clone();
conn.with_schema_mut(|schema| {
schema.remove_view(view_name);
Ok::<(), crate::LimboError>(())
})?;
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_close(
program: &Program,
state: &mut ProgramState,
@@ -6139,14 +6274,28 @@ pub fn op_parse_schema(
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, schema, &conn.syms.borrow(), state.mv_tx_id)
let existing_views = schema.views.clone();
parse_schema_rows(
stmt,
schema,
&conn.syms.borrow(),
state.mv_tx_id,
existing_views,
)
})?;
} else {
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, schema, &conn.syms.borrow(), state.mv_tx_id)
let existing_views = schema.views.clone();
parse_schema_rows(
stmt,
schema,
&conn.syms.borrow(),
state.mv_tx_id,
existing_views,
)
})?;
}
conn.auto_commit.set(previous_auto_commit);
@@ -6154,6 +6303,29 @@ pub fn op_parse_schema(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_populate_views(
program: &Program,
state: &mut ProgramState,
_insn: &Insn,
_pager: &Rc<Pager>,
_mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
let conn = program.connection.clone();
let schema = conn.schema.borrow();
match schema.populate_views(&conn)? {
IOResult::Done(()) => {
// All views populated, advance to next instruction
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
IOResult::IO => {
// Need more IO, stay on this instruction
Ok(InsnFunctionStepResult::IO)
}
}
}
pub fn op_read_cookie(
program: &Program,
state: &mut ProgramState,

View File

@@ -1280,6 +1280,15 @@ pub fn insn_to_str(
0,
format!("DROP TABLE {table_name}"),
),
Insn::DropView { db, view_name } => (
"DropView",
*db as i32,
0,
0,
Value::build_text(view_name),
0,
format!("DROP VIEW {view_name}"),
),
Insn::DropIndex { db: _, index } => (
"DropIndex",
0,
@@ -1328,6 +1337,15 @@ pub fn insn_to_str(
0,
where_clause.clone().unwrap_or("NULL".to_string()),
),
Insn::PopulateViews => (
"PopulateViews",
0,
0,
0,
Value::Null,
0,
"".to_string(),
),
Insn::Prev {
cursor_id,
pc_if_prev,

View File

@@ -122,6 +122,11 @@ impl InsertFlags {
self.0 |= InsertFlags::REQUIRE_SEEK;
self
}
pub fn update(mut self) -> Self {
self.0 |= InsertFlags::UPDATE;
self
}
}
#[derive(Clone, Copy, Debug)]
@@ -845,6 +850,12 @@ pub enum Insn {
// The name of the table being dropped
table_name: String,
},
DropView {
/// The database within which this view needs to be dropped
db: usize,
/// The name of the view being dropped
view_name: String,
},
DropIndex {
/// The database within which this index needs to be dropped (P1).
db: usize,
@@ -886,6 +897,9 @@ pub enum Insn {
where_clause: Option<String>,
},
/// Populate all views after schema parsing
PopulateViews,
/// Place the result of lhs >> rhs in dest register.
ShiftRight {
lhs: usize,
@@ -1161,10 +1175,12 @@ impl Insn {
Insn::Destroy { .. } => execute::op_destroy,
Insn::DropTable { .. } => execute::op_drop_table,
Insn::DropView { .. } => execute::op_drop_view,
Insn::Close { .. } => execute::op_close,
Insn::IsNull { .. } => execute::op_is_null,
Insn::CollSeq { .. } => execute::op_coll_seq,
Insn::ParseSchema { .. } => execute::op_parse_schema,
Insn::PopulateViews => execute::op_populate_views,
Insn::ShiftRight { .. } => execute::op_shift_right,
Insn::ShiftLeft { .. } => execute::op_shift_left,
Insn::AddImm { .. } => execute::op_add_imm,

View File

@@ -443,6 +443,25 @@ impl Program {
}
#[instrument(skip_all, level = Level::DEBUG)]
fn apply_view_deltas(&self, rollback: bool) {
if self.connection.view_transaction_states.borrow().is_empty() {
return;
}
let tx_states = self.connection.view_transaction_states.take();
if !rollback {
let schema = self.connection.schema.borrow();
for (view_name, tx_state) in tx_states.iter() {
if let Some(view_mutex) = schema.get_view(view_name) {
let mut view = view_mutex.lock().unwrap();
view.merge_delta(&tx_state.delta);
}
}
}
}
pub fn commit_txn(
&self,
pager: Rc<Pager>,
@@ -450,6 +469,8 @@ impl Program {
mv_store: Option<&Arc<MvStore>>,
rollback: bool,
) -> Result<StepResult> {
self.apply_view_deltas(rollback);
if self.connection.transaction_state.get() == TransactionState::None && mv_store.is_none() {
// No need to do any work here if not in tx. Current MVCC logic doesn't work with this assumption,
// hence the mv_store.is_none() check.

View File

@@ -6,7 +6,7 @@ use fallible_iterator::FallibleIterator;
use std::ffi::c_void;
use std::ptr::NonNull;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use turso_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
use turso_sqlite3_parser::{ast, lexer::sql::Parser};
@@ -14,6 +14,7 @@ use turso_sqlite3_parser::{ast, lexer::sql::Parser};
pub(crate) enum VirtualTableType {
Pragma(PragmaVirtualTable),
External(ExtVirtualTable),
View(crate::vtab_view::ViewVirtualTable),
}
#[derive(Clone, Debug)]
@@ -29,6 +30,7 @@ impl VirtualTable {
match &self.vtab_type {
VirtualTableType::Pragma(_) => true,
VirtualTableType::External(table) => table.readonly(),
VirtualTableType::View(_) => true,
}
}
@@ -86,6 +88,21 @@ impl VirtualTable {
Ok(Arc::new(vtab))
}
/// Create a virtual table for a view
pub(crate) fn view(
view_name: &str,
columns: Vec<Column>,
view: Arc<Mutex<crate::incremental::view::IncrementalView>>,
) -> crate::Result<Arc<VirtualTable>> {
let vtab = VirtualTable {
name: view_name.to_owned(),
columns,
kind: VTabKind::VirtualTable,
vtab_type: VirtualTableType::View(crate::vtab_view::ViewVirtualTable { view }),
};
Ok(Arc::new(vtab))
}
fn resolve_columns(schema: String) -> crate::Result<Vec<Column>> {
let mut parser = Parser::new(schema.as_bytes());
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or(
@@ -107,6 +124,9 @@ impl VirtualTable {
VirtualTableType::External(table) => {
Ok(VirtualTableCursor::External(table.open(conn.clone())?))
}
VirtualTableType::View(table) => {
Ok(VirtualTableCursor::View(Box::new(table.open(conn)?)))
}
}
}
@@ -114,6 +134,7 @@ impl VirtualTable {
match &self.vtab_type {
VirtualTableType::Pragma(_) => Err(LimboError::ReadOnly),
VirtualTableType::External(table) => table.update(args),
VirtualTableType::View(_) => Err(LimboError::ReadOnly),
}
}
@@ -121,6 +142,7 @@ impl VirtualTable {
match &self.vtab_type {
VirtualTableType::Pragma(_) => Ok(()),
VirtualTableType::External(table) => table.destroy(),
VirtualTableType::View(_) => Ok(()),
}
}
@@ -132,6 +154,7 @@ impl VirtualTable {
match &self.vtab_type {
VirtualTableType::Pragma(table) => table.best_index(constraints),
VirtualTableType::External(table) => table.best_index(constraints, order_by),
VirtualTableType::View(view) => view.best_index(),
}
}
}
@@ -139,6 +162,7 @@ impl VirtualTable {
pub enum VirtualTableCursor {
Pragma(Box<PragmaVirtualTableCursor>),
External(ExtVirtualTableCursor),
View(Box<crate::vtab_view::ViewVirtualTableCursor>),
}
impl VirtualTableCursor {
@@ -146,6 +170,7 @@ impl VirtualTableCursor {
match self {
VirtualTableCursor::Pragma(cursor) => cursor.next(),
VirtualTableCursor::External(cursor) => cursor.next(),
VirtualTableCursor::View(cursor) => cursor.next(),
}
}
@@ -153,6 +178,7 @@ impl VirtualTableCursor {
match self {
VirtualTableCursor::Pragma(cursor) => cursor.rowid(),
VirtualTableCursor::External(cursor) => cursor.rowid(),
VirtualTableCursor::View(cursor) => cursor.rowid(),
}
}
@@ -160,6 +186,7 @@ impl VirtualTableCursor {
match self {
VirtualTableCursor::Pragma(cursor) => cursor.column(column),
VirtualTableCursor::External(cursor) => cursor.column(column),
VirtualTableCursor::View(cursor) => cursor.column(column),
}
}
@@ -175,6 +202,7 @@ impl VirtualTableCursor {
VirtualTableCursor::External(cursor) => {
cursor.filter(idx_num, idx_str, arg_count, args)
}
VirtualTableCursor::View(cursor) => cursor.filter(args),
}
}
}

101
core/vtab_view.rs Normal file
View File

@@ -0,0 +1,101 @@
use crate::incremental::view::IncrementalView;
use crate::{Connection, LimboError, Value, VirtualTable};
use std::sync::{Arc, Mutex};
/// Create a virtual table wrapper for a view
pub fn create_view_virtual_table(
view_name: &str,
view: Arc<Mutex<IncrementalView>>,
) -> crate::Result<Arc<VirtualTable>> {
// Use the VirtualTable::view method we added
let view_locked = view.lock().map_err(|_| {
LimboError::InternalError("Failed to lock view for virtual table creation".to_string())
})?;
let columns = view_locked.columns.clone();
drop(view_locked); // Release the lock before passing the Arc
VirtualTable::view(view_name, columns, view)
}
/// Virtual table wrapper for incremental views
#[derive(Clone, Debug)]
pub struct ViewVirtualTable {
pub view: Arc<Mutex<IncrementalView>>,
}
impl ViewVirtualTable {
pub fn best_index(&self) -> Result<turso_ext::IndexInfo, turso_ext::ResultCode> {
// Views don't use indexes - return a simple index info
Ok(turso_ext::IndexInfo {
idx_num: 0,
idx_str: None,
order_by_consumed: false,
estimated_cost: 1000000.0,
estimated_rows: 1000,
constraint_usages: Vec::new(),
})
}
pub fn open(&self, conn: Arc<Connection>) -> crate::Result<ViewVirtualTableCursor> {
// Views are now populated during schema parsing (in parse_schema_rows)
// so we just get the current data from the view.
let view = self.view.lock().map_err(|_| {
LimboError::InternalError("Failed to lock view for reading".to_string())
})?;
let tx_states = conn.view_transaction_states.borrow();
let tx_state = tx_states.get(view.name());
let data: Vec<(i64, Vec<Value>)> = view.current_data(tx_state);
Ok(ViewVirtualTableCursor {
data,
current_pos: 0,
})
}
}
/// Cursor for iterating over view data
pub struct ViewVirtualTableCursor {
data: Vec<(i64, Vec<Value>)>,
current_pos: usize,
}
impl ViewVirtualTableCursor {
pub fn next(&mut self) -> crate::Result<bool> {
if self.current_pos < self.data.len() {
self.current_pos += 1;
Ok(self.current_pos < self.data.len())
} else {
Ok(false)
}
}
pub fn rowid(&self) -> i64 {
if self.current_pos < self.data.len() {
self.data[self.current_pos].0
} else {
-1
}
}
pub fn column(&self, column: usize) -> crate::Result<Value> {
if self.current_pos >= self.data.len() {
return Ok(Value::Null);
}
let (_row_key, values) = &self.data[self.current_pos];
// Return the value at the requested column index
if let Some(value) = values.get(column) {
Ok(value.clone())
} else {
Ok(Value::Null)
}
}
pub fn filter(&mut self, _args: Vec<Value>) -> crate::Result<bool> {
// Reset to beginning for new filter
self.current_pos = 0;
Ok(!self.data.is_empty())
}
}

View File

@@ -31,6 +31,7 @@ impl TempDatabase {
turso_core::OpenFlags::default(),
false,
enable_indexes,
false,
)
.unwrap();
Self { path, io, db }
@@ -56,6 +57,7 @@ impl TempDatabase {
flags,
false,
enable_indexes,
false,
)
.unwrap();
Self {
@@ -85,6 +87,7 @@ impl TempDatabase {
turso_core::OpenFlags::default(),
false,
enable_indexes,
false,
)
.unwrap();

View File

@@ -706,6 +706,7 @@ fn test_wal_bad_frame() -> anyhow::Result<()> {
turso_core::OpenFlags::default(),
false,
false,
false,
)
.unwrap();
let tmp_db = TempDatabase {