re-parse schema if necessary after WAL sync end

This commit is contained in:
Nikita Sivukhin
2025-07-23 21:18:28 +04:00
parent edd6ef2d21
commit 6daa6d07f1
3 changed files with 145 additions and 21 deletions

View File

@@ -41,6 +41,7 @@ mod numeric;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use crate::storage::header_accessor::get_schema_cookie;
use crate::storage::sqlite3_ondisk::is_valid_page_size;
use crate::storage::{header_accessor, wal::DummyWAL};
use crate::translate::optimizer::optimize_plan;
@@ -446,11 +447,35 @@ impl Database {
}
#[inline]
pub fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> Result<T>) -> Result<T> {
pub(crate) fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> Result<T>) -> Result<T> {
let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
let schema = Arc::make_mut(&mut *schema_ref);
f(schema)
}
pub(crate) fn clone_schema(&self) -> Result<Arc<Schema>> {
let schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
Ok(schema.clone())
}
pub(crate) fn update_schema_if_newer(&self, another: Arc<Schema>) -> Result<()> {
let mut schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
if schema.schema_version < another.schema_version {
tracing::debug!(
"DB schema is outdated: {} < {}",
schema.schema_version,
another.schema_version
);
*schema = another;
} else {
tracing::debug!(
"DB schema is up to date: {} >= {}",
schema.schema_version,
another.schema_version
);
}
Ok(())
}
}
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
@@ -618,6 +643,35 @@ impl Connection {
}
}
/// Parse schema from scratch if version of schema for the connection differs from the schema cookie in the root page
fn maybe_reparse_schema(self: &Arc<Connection>) -> Result<()> {
let pager = self.pager.borrow().clone();
pager.begin_read_tx()?;
let db_cookie = get_schema_cookie(&pager);
pager.end_read_tx().expect("read txn must be finished");
let db_cookie = db_cookie?;
let connection_cookie = self.schema.borrow().schema_version;
turso_assert!(
connection_cookie <= db_cookie,
"connection cookie can't be larger than db cookie: {} vs {}",
connection_cookie,
db_cookie
);
if self.schema.borrow().schema_version != db_cookie {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
self.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
parse_schema_rows(Some(stmt), schema, &self.syms.borrow(), None)
})?;
let schema = self.schema.borrow().clone();
self._db.update_schema_if_newer(schema)?;
}
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn query(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Option<Statement>> {
if self.closed.get() {
@@ -841,15 +895,19 @@ impl Connection {
/// All frames written after last commit frame (db_size > 0) within the session will be rolled back
#[cfg(feature = "fs")]
pub fn wal_insert_end(self: &Arc<Connection>) -> Result<()> {
let pager = self.pager.borrow();
{
let pager = self.pager.borrow();
// remove all non-commited changes in case if WAL session left some suffix without commit frame
pager.rollback(false, self).expect("rollback must succeed");
// remove all non-commited changes in case if WAL session left some suffix without commit frame
pager.rollback(false, self).expect("rollback must succeed");
let wal = pager.wal.borrow_mut();
wal.end_write_tx();
wal.end_read_tx();
Ok(())
let wal = pager.wal.borrow_mut();
wal.end_write_tx();
wal.end_read_tx();
}
// let's re-parse schema from scratch if schema cookie changed compared to the our in-memory view of schema
self.maybe_reparse_schema()
}
/// Flush dirty pages to disk.

View File

@@ -790,11 +790,7 @@ impl Pager {
if schema_did_change {
let schema = connection.schema.borrow().clone();
*connection
._db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)? = schema;
connection._db.update_schema_if_newer(schema)?;
}
Ok(commit_status)
}
@@ -1445,14 +1441,7 @@ impl Pager {
cache.unset_dirty_all_pages();
cache.clear().expect("failed to clear page cache");
if schema_did_change {
connection.schema.replace(
connection
._db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
.clone(),
);
connection.schema.replace(connection._db.clone_schema()?);
}
self.wal.borrow_mut().rollback()?;

View File

@@ -1,6 +1,7 @@
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use rusqlite::types::Value;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase};
@@ -58,6 +59,46 @@ fn test_wal_frame_transfer_no_schema_changes() {
);
}
#[test]
fn test_wal_frame_transfer_schema_changes() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.unwrap();
let db1 = TempDatabase::new_empty(false);
let conn1 = db1.connect_limbo();
let db2 = TempDatabase::new_empty(false);
let conn2 = db2.connect_limbo();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn1
.execute("INSERT INTO t VALUES (10, 10), (5, 1)")
.unwrap();
conn1
.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))")
.unwrap();
assert_eq!(conn1.wal_frame_count().unwrap(), 15);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
conn2.wal_insert_end().unwrap();
assert_eq!(conn2.wal_frame_count().unwrap(), 15);
assert_eq!(
limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"),
vec![
vec![Value::Integer(5), Value::Integer(1)],
vec![Value::Integer(10), Value::Integer(2)],
vec![Value::Integer(1024), Value::Integer(40960)],
]
);
}
#[test]
fn test_wal_frame_transfer_no_schema_changes_rollback() {
let db1 = TempDatabase::new_empty(false);
@@ -97,6 +138,42 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() {
);
}
#[test]
fn test_wal_frame_transfer_schema_changes_rollback() {
let db1 = TempDatabase::new_empty(false);
let conn1 = db1.connect_limbo();
let db2 = TempDatabase::new_empty(false);
let conn2 = db2.connect_limbo();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
conn1
.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))")
.unwrap();
assert_eq!(conn1.wal_frame_count().unwrap(), 14);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
conn2.wal_insert_end().unwrap();
assert_eq!(conn2.wal_frame_count().unwrap(), 2);
assert_eq!(
limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"),
vec![] as Vec<Vec<rusqlite::types::Value>>
);
conn2.execute("CREATE TABLE q(x)").unwrap();
conn2
.execute("INSERT INTO q VALUES (randomblob(4096 * 10))")
.unwrap();
assert_eq!(
limbo_exec_rows(&db2, &conn2, "SELECT x, LENGTH(y) FROM t"),
vec![] as Vec<Vec<rusqlite::types::Value>>
);
}
#[test]
fn test_wal_frame_conflict() {
let db1 = TempDatabase::new_empty(false);