Merge 'Rewrite the WAL' from Preston Thorpe

closes #1893
Adds some fairly extensive tests but I'll continue to add some python
tests on top of the unit tests.
## Restart:
tested 
- open new DB
- create table and do a bunch of inserts
- `pragma wal_checkpoint(RESTART);`
- close db file
- re-open and verify we can read the wal/repopulate the frame cache
- verify min|max frame
tested 
- open same DB
- add more inserts
- `pragma wal_checkpoint(RESTART);`
- do _more_ inserts
- close
- re-open
- verify checksums/max_frame are valid
- verify row count
## Truncate
tested 
- open new db
- create table and add inserts
- `pragma wal_checkpoint(truncate);`
- close file
- verify WAL file is empty (32 bytes, header only)
- re-open file
- verify content/row count
tested 
- open db
- create table and insert many rows
- `pragma wal_checkpoint(truncate);`
- insert _more_ rows
- close db file
- verify WAL file is valid
- re-open file
- verify we can read entire file/repopulate the frame cache
<img width="541" height="315" alt="image" src="https://github.com/user-
attachments/assets/0470c795-5116-4866-b913-78c07b06b68c" />
```
# header
magic=0x377f0682
version=3007000
page_size=4096
seq=2
salt=ec475ff2-7ea94342
checksum=c9464aff-c571cc22
```

Closes #2179
This commit is contained in:
Jussi Saurio
2025-07-30 18:50:49 +03:00
30 changed files with 2990 additions and 353 deletions

2
Cargo.lock generated
View File

@@ -4136,6 +4136,8 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
name = "turso"
version = "0.1.3"
dependencies = [
"rand 0.8.5",
"rand_chacha 0.3.1",
"tempfile",
"thiserror 2.0.12",
"tokio",

View File

@@ -726,6 +726,14 @@ impl turso_core::DatabaseStorage for DatabaseFile {
fn size(&self) -> turso_core::Result<u64> {
self.file.size()
}
fn truncate(
&self,
len: usize,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let c = self.file.truncate(len, c)?;
Ok(c)
}
}
#[inline]

View File

@@ -21,3 +21,5 @@ thiserror = "2.0.9"
[dev-dependencies]
tempfile = "3.20.0"
tokio = { version = "1.29.1", features = ["full"] }
rand = "0.8.5"
rand_chacha = "0.3.1"

View File

@@ -0,0 +1,569 @@
use rand::seq::SliceRandom;
use rand::Rng;
use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
use std::collections::HashMap;
use turso::{Builder, Value};
// In-memory representation of the database state
#[derive(Debug, Clone, PartialEq, Eq)]
struct DbRow {
id: i64,
text: String,
}
impl std::fmt::Display for DbRow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(id={}, text={})", self.id, self.text)
}
}
#[derive(Debug, Clone)]
struct TransactionState {
// What this transaction can see (snapshot)
visible_rows: HashMap<i64, DbRow>,
// Pending changes in this transaction
pending_changes: Vec<Operation>,
}
#[derive(Debug)]
struct ShadowDb {
// Committed state (what's actually in the database)
committed_rows: HashMap<i64, DbRow>,
// Transaction states
transactions: HashMap<usize, Option<TransactionState>>,
}
impl ShadowDb {
fn new() -> Self {
Self {
committed_rows: HashMap::new(),
transactions: HashMap::new(),
}
}
fn begin_transaction(&mut self, tx_id: usize, immediate: bool) {
self.transactions.insert(
tx_id,
if immediate {
Some(TransactionState {
visible_rows: self.committed_rows.clone(),
pending_changes: Vec::new(),
})
} else {
None
},
);
}
fn take_snapshot(&mut self, tx_id: usize) {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
assert!(tx_state.is_none());
tx_state.replace(TransactionState {
visible_rows: self.committed_rows.clone(),
pending_changes: Vec::new(),
});
}
}
fn commit_transaction(&mut self, tx_id: usize) {
if let Some(tx_state) = self.transactions.remove(&tx_id) {
let tx_state = tx_state.unwrap();
// Apply pending changes to committed state
for op in tx_state.pending_changes {
match op {
Operation::Insert { id, text } => {
self.committed_rows.insert(id, DbRow { id, text });
}
Operation::Update { id, text } => {
self.committed_rows.insert(id, DbRow { id, text });
}
Operation::Delete { id } => {
self.committed_rows.remove(&id);
}
other => unreachable!("Unexpected operation: {other}"),
}
}
}
}
fn rollback_transaction(&mut self, tx_id: usize) {
self.transactions.remove(&tx_id);
}
fn insert(&mut self, tx_id: usize, id: i64, text: String) -> Result<(), String> {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
// Check if row exists in visible state
if tx_state.as_ref().unwrap().visible_rows.contains_key(&id) {
return Err("UNIQUE constraint failed".to_string());
}
let row = DbRow {
id,
text: text.clone(),
};
tx_state
.as_mut()
.unwrap()
.pending_changes
.push(Operation::Insert { id, text });
tx_state.as_mut().unwrap().visible_rows.insert(id, row);
Ok(())
} else {
Err("No active transaction".to_string())
}
}
fn update(&mut self, tx_id: usize, id: i64, text: String) -> Result<(), String> {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
// Check if row exists in visible state
if !tx_state.as_ref().unwrap().visible_rows.contains_key(&id) {
return Err("Row not found".to_string());
}
let row = DbRow {
id,
text: text.clone(),
};
tx_state
.as_mut()
.unwrap()
.pending_changes
.push(Operation::Update { id, text });
tx_state.as_mut().unwrap().visible_rows.insert(id, row);
Ok(())
} else {
Err("No active transaction".to_string())
}
}
fn delete(&mut self, tx_id: usize, id: i64) -> Result<(), String> {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
// Check if row exists in visible state
if !tx_state.as_ref().unwrap().visible_rows.contains_key(&id) {
return Err("Row not found".to_string());
}
tx_state
.as_mut()
.unwrap()
.pending_changes
.push(Operation::Delete { id });
tx_state.as_mut().unwrap().visible_rows.remove(&id);
Ok(())
} else {
Err("No active transaction".to_string())
}
}
fn get_visible_rows(&self, tx_id: Option<usize>) -> Vec<DbRow> {
let Some(tx_id) = tx_id else {
// No transaction - see committed state
return self.committed_rows.values().cloned().collect();
};
if let Some(tx_state) = self.transactions.get(&tx_id) {
let tx_state = tx_state.as_ref().unwrap();
tx_state.visible_rows.values().cloned().collect()
} else {
// No transaction - see committed state
self.committed_rows.values().cloned().collect()
}
}
}
#[derive(Debug, Clone)]
enum Operation {
Begin,
Commit,
Rollback,
Insert { id: i64, text: String },
Update { id: i64, text: String },
Delete { id: i64 },
Select,
}
impl std::fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Operation::Begin => write!(f, "BEGIN"),
Operation::Commit => write!(f, "COMMIT"),
Operation::Rollback => write!(f, "ROLLBACK"),
Operation::Insert { id, text } => {
write!(f, "INSERT INTO test_table (id, text) VALUES ({id}, {text})")
}
Operation::Update { id, text } => {
write!(f, "UPDATE test_table SET text = {text} WHERE id = {id}")
}
Operation::Delete { id } => write!(f, "DELETE FROM test_table WHERE id = {id}"),
Operation::Select => write!(f, "SELECT * FROM test_table"),
}
}
}
fn rng_from_time_or_env() -> (ChaCha8Rng, u64) {
let seed = std::env::var("SEED").map_or(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis(),
|v| {
v.parse()
.expect("Failed to parse SEED environment variable as u64")
},
);
let rng = ChaCha8Rng::seed_from_u64(seed as u64);
(rng, seed as u64)
}
#[tokio::test]
/// Verify translation isolation semantics with multiple concurrent connections.
/// This test is ignored because it still fails sometimes; unsure if it fails due to a bug in the test or a bug in the implementation.
async fn test_multiple_connections_fuzz() {
let (mut rng, seed) = rng_from_time_or_env();
println!("Multiple connections fuzz test seed: {seed}");
const NUM_ITERATIONS: usize = 50;
const OPERATIONS_PER_CONNECTION: usize = 30;
const NUM_CONNECTIONS: usize = 2;
for iteration in 0..NUM_ITERATIONS {
// Create a fresh database for each iteration
let tempfile = tempfile::NamedTempFile::new().unwrap();
let db = Builder::new_local(tempfile.path().to_str().unwrap())
.build()
.await
.unwrap();
// SHARED shadow database for all connections
let mut shared_shadow_db = ShadowDb::new();
let mut next_tx_id = 0;
// Create connections
let mut connections = Vec::new();
for conn_id in 0..NUM_CONNECTIONS {
let conn = db.connect().unwrap();
// Create table if it doesn't exist
conn.execute(
"CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)",
(),
)
.await
.unwrap();
connections.push((conn, conn_id, None::<usize>)); // (connection, conn_id, current_tx_id)
}
// Interleave operations between all connections
for op_num in 0..OPERATIONS_PER_CONNECTION {
for (conn, conn_id, current_tx_id) in &mut connections {
// Generate operation based on current transaction state
let visible_rows = if let Some(tx_id) = *current_tx_id {
// Take snapshot during first operation after a BEGIN, not immediately at BEGIN (the semantics is BEGIN DEFERRED)
let tx_state = shared_shadow_db.transactions.get(&tx_id).unwrap();
if tx_state.is_none() {
shared_shadow_db.take_snapshot(tx_id);
}
shared_shadow_db.get_visible_rows(Some(tx_id))
} else {
shared_shadow_db.get_visible_rows(None) // No transaction
};
let operation =
generate_operation(&mut rng, current_tx_id.is_some(), &visible_rows);
println!("Connection {conn_id}(op={op_num}): {operation}");
match operation {
Operation::Begin => {
shared_shadow_db.begin_transaction(next_tx_id, false);
*current_tx_id = Some(next_tx_id);
next_tx_id += 1;
conn.execute("BEGIN", ()).await.unwrap();
}
Operation::Commit => {
let Some(tx_id) = *current_tx_id else {
panic!("Connection {conn_id}(op={op_num}) FAILED: No transaction");
};
// Try real DB commit first
let result = conn.execute("COMMIT", ()).await;
match result {
Ok(_) => {
// Success - update shadow DB
shared_shadow_db.commit_transaction(tx_id);
*current_tx_id = None;
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
}
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during commit: {e}");
}
}
}
}
Operation::Rollback => {
if let Some(tx_id) = *current_tx_id {
// Try real DB rollback first
let result = conn.execute("ROLLBACK", ()).await;
match result {
Ok(_) => {
// Success - update shadow DB
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
// Check if it's an acceptable error
if !e.to_string().contains("Busy")
&& !e.to_string().contains("database is locked")
{
panic!("Unexpected error during rollback: {e}");
}
}
}
}
}
Operation::Insert { id, text } => {
let result = conn
.execute(
"INSERT INTO test_table (id, text) VALUES (?, ?)",
vec![Value::Integer(id), Value::Text(text.clone())],
)
.await;
// Check if real DB operation succeeded
match result {
Ok(_) => {
// Success - update shadow DB
if let Some(tx_id) = *current_tx_id {
// In transaction - update transaction's view
shared_shadow_db.insert(tx_id, id, text.clone()).unwrap();
} else {
// Auto-commit - update shadow DB committed state
shared_shadow_db.begin_transaction(next_tx_id, true);
shared_shadow_db
.insert(next_tx_id, id, text.clone())
.unwrap();
shared_shadow_db.commit_transaction(next_tx_id);
next_tx_id += 1;
}
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
}
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during insert: {e}");
}
}
}
}
Operation::Update { id, text } => {
let result = conn
.execute(
"UPDATE test_table SET text = ? WHERE id = ?",
vec![Value::Text(text.clone()), Value::Integer(id)],
)
.await;
// Check if real DB operation succeeded
match result {
Ok(_) => {
// Success - update shadow DB
if let Some(tx_id) = *current_tx_id {
// In transaction - update transaction's view
shared_shadow_db.update(tx_id, id, text.clone()).unwrap();
} else {
// Auto-commit - update shadow DB committed state
shared_shadow_db.begin_transaction(next_tx_id, true);
shared_shadow_db
.update(next_tx_id, id, text.clone())
.unwrap();
shared_shadow_db.commit_transaction(next_tx_id);
next_tx_id += 1;
}
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
}
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during update: {e}");
}
}
}
}
Operation::Delete { id } => {
let result = conn
.execute(
"DELETE FROM test_table WHERE id = ?",
vec![Value::Integer(id)],
)
.await;
// Check if real DB operation succeeded
match result {
Ok(_) => {
// Success - update shadow DB
if let Some(tx_id) = *current_tx_id {
// In transaction - update transaction's view
shared_shadow_db.delete(tx_id, id).unwrap();
} else {
// Auto-commit - update shadow DB committed state
shared_shadow_db.begin_transaction(next_tx_id, true);
shared_shadow_db.delete(next_tx_id, id).unwrap();
shared_shadow_db.commit_transaction(next_tx_id);
next_tx_id += 1;
}
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
}
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during delete: {e}");
}
}
}
}
Operation::Select => {
let query_str = "SELECT id, text FROM test_table ORDER BY id";
let mut rows = conn.query(query_str, ()).await.unwrap();
let mut real_rows = Vec::new();
while let Some(row) = rows.next().await.unwrap() {
let id = row.get_value(0).unwrap();
let text = row.get_value(1).unwrap();
if let (Value::Integer(id), Value::Text(text)) = (id, text) {
real_rows.push(DbRow { id, text });
}
}
real_rows.sort_by_key(|r| r.id);
let mut expected_rows = visible_rows.clone();
expected_rows.sort_by_key(|r| r.id);
if real_rows != expected_rows {
let diff = {
let mut diff = Vec::new();
for row in expected_rows.iter() {
if !real_rows.contains(row) {
diff.push(row);
}
}
for row in real_rows.iter() {
if !expected_rows.contains(row) {
diff.push(row);
}
}
diff
};
panic!(
"Row mismatch in iteration {iteration} Connection {conn_id}(op={op_num}). Query: {query_str}.\n\nExpected: {}\n\nGot: {}\n\nDiff: {}\n\nSeed: {seed}",
expected_rows.iter().map(|r| r.to_string()).collect::<Vec<_>>().join(", "),
real_rows.iter().map(|r| r.to_string()).collect::<Vec<_>>().join(", "),
diff.iter().map(|r| r.to_string()).collect::<Vec<_>>().join(", "),
);
}
}
}
}
}
}
}
fn generate_operation(
rng: &mut ChaCha8Rng,
in_transaction: bool,
visible_rows: &[DbRow],
) -> Operation {
match rng.gen_range(0..100) {
// 10% chance to begin transaction
0..=9 => {
if !in_transaction {
Operation::Begin
} else {
generate_data_operation(rng, visible_rows)
}
}
// 5% chance to commit
10..=14 => {
if in_transaction {
Operation::Commit
} else {
generate_data_operation(rng, visible_rows)
}
}
// 5% chance to rollback
15..=19 => {
if in_transaction {
Operation::Rollback
} else {
generate_data_operation(rng, visible_rows)
}
}
// 80% chance for data operations
_ => generate_data_operation(rng, visible_rows),
}
}
fn generate_data_operation(rng: &mut ChaCha8Rng, visible_rows: &[DbRow]) -> Operation {
match rng.gen_range(0..4) {
0 => {
// Insert - generate a new ID that doesn't exist
let id = if visible_rows.is_empty() {
rng.gen_range(1..1000)
} else {
let max_id = visible_rows.iter().map(|r| r.id).max().unwrap();
rng.gen_range(max_id + 1..max_id + 100)
};
let text = format!("text_{}", rng.gen_range(1..1000));
Operation::Insert { id, text }
}
1 => {
// Update - only if there are visible rows
if visible_rows.is_empty() {
// No rows to update, try insert instead
let id = rng.gen_range(1..1000);
let text = format!("text_{}", rng.gen_range(1..1000));
Operation::Insert { id, text }
} else {
let id = visible_rows.choose(rng).unwrap().id;
let text = format!("updated_{}", rng.gen_range(1..1000));
Operation::Update { id, text }
}
}
2 => {
// Delete - only if there are visible rows
if visible_rows.is_empty() {
// No rows to delete, try insert instead
let id = rng.gen_range(1..1000);
let text = format!("text_{}", rng.gen_range(1..1000));
Operation::Insert { id, text }
} else {
let id = visible_rows.choose(rng).unwrap().id;
Operation::Delete { id }
}
}
3 => Operation::Select,
_ => unreachable!(),
}
}

461
bindings/wasm/lib.rs Normal file
View File

@@ -0,0 +1,461 @@
#[cfg(all(feature = "web", feature = "nodejs"))]
compile_error!("Features 'web' and 'nodejs' cannot be enabled at the same time");
use js_sys::{Array, Object};
use std::cell::RefCell;
use std::sync::Arc;
use turso_core::{Clock, Instant, OpenFlags, Result};
use wasm_bindgen::prelude::*;
#[allow(dead_code)]
#[wasm_bindgen]
pub struct Database {
db: Arc<turso_core::Database>,
conn: Arc<turso_core::Connection>,
}
#[allow(clippy::arc_with_non_send_sync)]
#[wasm_bindgen]
impl Database {
#[wasm_bindgen(constructor)]
pub fn new(path: &str) -> Database {
let io: Arc<dyn turso_core::IO> = Arc::new(PlatformIO { vfs: VFS::new() });
let file = io.open_file(path, OpenFlags::Create, false).unwrap();
let db_file = Arc::new(DatabaseFile::new(file));
let db = turso_core::Database::open(io, path, db_file, false, false).unwrap();
let conn = db.connect().unwrap();
Database { db, conn }
}
#[wasm_bindgen]
pub fn exec(&self, _sql: &str) {
self.conn.execute(_sql).unwrap();
}
#[wasm_bindgen]
pub fn prepare(&self, _sql: &str) -> Statement {
let stmt = self.conn.prepare(_sql).unwrap();
Statement::new(RefCell::new(stmt), false)
}
}
#[wasm_bindgen]
pub struct RowIterator {
inner: RefCell<turso_core::Statement>,
}
#[wasm_bindgen]
impl RowIterator {
fn new(inner: RefCell<turso_core::Statement>) -> Self {
Self { inner }
}
#[wasm_bindgen]
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> JsValue {
let mut stmt = self.inner.borrow_mut();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
let row = stmt.row().unwrap();
let row_array = Array::new();
for value in row.get_values() {
let value = to_js_value(value);
row_array.push(&value);
}
JsValue::from(row_array)
}
Ok(turso_core::StepResult::IO) => JsValue::UNDEFINED,
Ok(turso_core::StepResult::Done) | Ok(turso_core::StepResult::Interrupt) => {
JsValue::UNDEFINED
}
Ok(turso_core::StepResult::Busy) => JsValue::UNDEFINED,
Err(e) => panic!("Error: {e:?}"),
}
}
}
#[wasm_bindgen]
pub struct Statement {
inner: RefCell<turso_core::Statement>,
raw: bool,
}
#[wasm_bindgen]
impl Statement {
fn new(inner: RefCell<turso_core::Statement>, raw: bool) -> Self {
Self { inner, raw }
}
#[wasm_bindgen]
pub fn raw(mut self, toggle: Option<bool>) -> Self {
self.raw = toggle.unwrap_or(true);
self
}
pub fn get(&self) -> JsValue {
let mut stmt = self.inner.borrow_mut();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
let row = stmt.row().unwrap();
let row_array = js_sys::Array::new();
for value in row.get_values() {
let value = to_js_value(value);
row_array.push(&value);
}
JsValue::from(row_array)
}
Ok(turso_core::StepResult::IO)
| Ok(turso_core::StepResult::Done)
| Ok(turso_core::StepResult::Interrupt)
| Ok(turso_core::StepResult::Busy) => JsValue::UNDEFINED,
Err(e) => panic!("Error: {e:?}"),
}
}
pub fn all(&self) -> js_sys::Array {
let array = js_sys::Array::new();
loop {
let mut stmt = self.inner.borrow_mut();
match stmt.step() {
Ok(turso_core::StepResult::Row) => {
let row = stmt.row().unwrap();
let row_array = js_sys::Array::new();
for value in row.get_values() {
let value = to_js_value(value);
row_array.push(&value);
}
array.push(&row_array);
}
Ok(turso_core::StepResult::IO) => {}
Ok(turso_core::StepResult::Interrupt) => break,
Ok(turso_core::StepResult::Done) => break,
Ok(turso_core::StepResult::Busy) => break,
Err(e) => panic!("Error: {e:?}"),
}
}
array
}
#[wasm_bindgen]
pub fn iterate(self) -> JsValue {
let iterator = RowIterator::new(self.inner);
let iterator_obj = Object::new();
// Define the next method that will be called by JavaScript
let next_fn = js_sys::Function::new_with_args(
"",
"const value = this.iterator.next();
const done = value === undefined;
return {
value,
done
};",
);
js_sys::Reflect::set(&iterator_obj, &JsValue::from_str("next"), &next_fn).unwrap();
js_sys::Reflect::set(
&iterator_obj,
&JsValue::from_str("iterator"),
&JsValue::from(iterator),
)
.unwrap();
let symbol_iterator = js_sys::Function::new_no_args("return this;");
js_sys::Reflect::set(&iterator_obj, &js_sys::Symbol::iterator(), &symbol_iterator).unwrap();
JsValue::from(iterator_obj)
}
}
fn to_js_value(value: &turso_core::Value) -> JsValue {
match value {
turso_core::Value::Null => JsValue::null(),
turso_core::Value::Integer(i) => {
let i = *i;
if i >= i32::MIN as i64 && i <= i32::MAX as i64 {
JsValue::from(i as i32)
} else {
JsValue::from(i)
}
}
turso_core::Value::Float(f) => JsValue::from(*f),
turso_core::Value::Text(t) => JsValue::from_str(t.as_str()),
turso_core::Value::Blob(b) => js_sys::Uint8Array::from(b.as_slice()).into(),
}
}
pub struct File {
vfs: VFS,
fd: i32,
}
unsafe impl Send for File {}
unsafe impl Sync for File {}
#[allow(dead_code)]
impl File {
fn new(vfs: VFS, fd: i32) -> Self {
Self { vfs, fd }
}
}
impl turso_core::File for File {
fn lock_file(&self, _exclusive: bool) -> Result<()> {
// TODO
Ok(())
}
fn unlock_file(&self) -> Result<()> {
// TODO
Ok(())
}
fn pread(
&self,
pos: usize,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
let r = match c.completion_type {
turso_core::CompletionType::Read(ref r) => r,
_ => unreachable!(),
};
let nr = {
let mut buf = r.buf_mut();
let buf: &mut [u8] = buf.as_mut_slice();
self.vfs.pread(self.fd, buf, pos)
};
r.complete(nr);
#[allow(clippy::arc_with_non_send_sync)]
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<std::cell::RefCell<turso_core::Buffer>>,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
let w = match c.completion_type {
turso_core::CompletionType::Write(ref w) => w,
_ => unreachable!(),
};
let buf = buffer.borrow();
let buf: &[u8] = buf.as_slice();
self.vfs.pwrite(self.fd, buf, pos);
w.complete(buf.len() as i32);
#[allow(clippy::arc_with_non_send_sync)]
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> Result<turso_core::Completion> {
self.vfs.sync(self.fd);
c.complete(0);
#[allow(clippy::arc_with_non_send_sync)]
Ok(c)
}
fn size(&self) -> Result<u64> {
Ok(self.vfs.size(self.fd))
}
fn truncate(
&self,
len: usize,
c: turso_core::Completion,
) -> Result<turso_core::Completion> {
self.vfs.truncate(self.fd, len);
c.complete(0);
#[allow(clippy::arc_with_non_send_sync)]
Ok(c)
}
}
pub struct PlatformIO {
vfs: VFS,
}
unsafe impl Send for PlatformIO {}
unsafe impl Sync for PlatformIO {}
impl Clock for PlatformIO {
fn now(&self) -> Instant {
let date = Date::new();
let ms_since_epoch = date.getTime();
Instant {
secs: (ms_since_epoch / 1000.0) as i64,
micros: ((ms_since_epoch % 1000.0) * 1000.0) as u32,
}
}
}
impl turso_core::IO for PlatformIO {
fn open_file(
&self,
path: &str,
_flags: OpenFlags,
_direct: bool,
) -> Result<Arc<dyn turso_core::File>> {
let fd = self.vfs.open(path, "a+");
Ok(Arc::new(File {
vfs: VFS::new(),
fd,
}))
}
fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
Ok(())
}
fn run_once(&self) -> Result<()> {
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut buf = [0u8; 8];
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<turso_core::MemoryIO> {
Arc::new(turso_core::MemoryIO::new())
}
}
#[wasm_bindgen]
extern "C" {
type Date;
#[wasm_bindgen(constructor)]
fn new() -> Date;
#[wasm_bindgen(method, getter)]
fn toISOString(this: &Date) -> String;
#[wasm_bindgen(method)]
fn getTime(this: &Date) -> f64;
}
pub struct DatabaseFile {
file: Arc<dyn turso_core::File>,
}
unsafe impl Send for DatabaseFile {}
unsafe impl Sync for DatabaseFile {}
impl DatabaseFile {
pub fn new(file: Arc<dyn turso_core::File>) -> Self {
Self { file }
}
}
impl turso_core::DatabaseStorage for DatabaseFile {
fn read_page(&self, page_idx: usize, c: turso_core::Completion) -> Result<()> {
let r = match c.completion_type {
turso_core::CompletionType::Read(ref r) => r,
_ => unreachable!(),
};
let size = r.buf().len();
assert!(page_idx > 0);
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
return Err(turso_core::LimboError::NotADB);
}
let pos = (page_idx - 1) * size;
self.file.pread(pos, c.into())?;
Ok(())
}
fn write_page(
&self,
page_idx: usize,
buffer: Arc<std::cell::RefCell<turso_core::Buffer>>,
c: turso_core::Completion,
) -> Result<()> {
let size = buffer.borrow().len();
let pos = (page_idx - 1) * size;
self.file.pwrite(pos, buffer, c.into())?;
Ok(())
}
fn sync(&self, c: turso_core::Completion) -> Result<()> {
let _ = self.file.sync(c.into())?;
Ok(())
}
fn size(&self) -> Result<u64> {
self.file.size()
}
fn truncate(&self, len: usize, c: turso_core::Completion) -> Result<()> {
self.file.truncate(len, c)?;
Ok(())
}
}
#[cfg(all(feature = "web", not(feature = "nodejs")))]
#[wasm_bindgen(module = "/web/src/web-vfs.js")]
extern "C" {
type VFS;
#[wasm_bindgen(constructor)]
fn new() -> VFS;
#[wasm_bindgen(method)]
fn open(this: &VFS, path: &str, flags: &str) -> i32;
#[wasm_bindgen(method)]
fn close(this: &VFS, fd: i32) -> bool;
#[wasm_bindgen(method)]
fn pwrite(this: &VFS, fd: i32, buffer: &[u8], offset: usize) -> i32;
#[wasm_bindgen(method)]
fn pread(this: &VFS, fd: i32, buffer: &mut [u8], offset: usize) -> i32;
#[wasm_bindgen(method)]
fn size(this: &VFS, fd: i32) -> u64;
#[wasm_bindgen(method)]
fn truncate(this: &VFS, fd: i32, len: usize);
#[wasm_bindgen(method)]
fn sync(this: &VFS, fd: i32);
}
#[cfg(all(feature = "nodejs", not(feature = "web")))]
#[wasm_bindgen(module = "/node/src/vfs.cjs")]
extern "C" {
type VFS;
#[wasm_bindgen(constructor)]
fn new() -> VFS;
#[wasm_bindgen(method)]
fn open(this: &VFS, path: &str, flags: &str) -> i32;
#[wasm_bindgen(method)]
fn close(this: &VFS, fd: i32) -> bool;
#[wasm_bindgen(method)]
fn pwrite(this: &VFS, fd: i32, buffer: &[u8], offset: usize) -> i32;
#[wasm_bindgen(method)]
fn pread(this: &VFS, fd: i32, buffer: &mut [u8], offset: usize) -> i32;
#[wasm_bindgen(method)]
fn size(this: &VFS, fd: i32) -> u64;
#[wasm_bindgen(method)]
fn truncate(this: &VFS, fd: i32, len: usize);
#[wasm_bindgen(method)]
fn sync(this: &VFS, fd: i32);
}
#[wasm_bindgen(start)]
pub fn init() {
console_error_panic_hook::set_once();
}

View File

@@ -613,8 +613,8 @@ impl Limbo {
std::process::exit(0)
}
Command::Open(args) => {
if self.open_db(&args.path, args.vfs_name.as_deref()).is_err() {
let _ = self.writeln("Error: Unable to open database file.");
if let Err(e) = self.open_db(&args.path, args.vfs_name.as_deref()) {
let _ = self.writeln(e.to_string());
}
}
Command::Schema(args) => {

View File

@@ -121,6 +121,14 @@ impl File for GenericFile {
Ok(c)
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let mut file = self.file.borrow_mut();
file.set_len(len as u64)
.map_err(|err| LimboError::IOError(err))?;
c.complete(0);
Ok(c)
}
fn size(&self) -> Result<u64> {
let file = self.file.borrow();
Ok(file.metadata().unwrap().len())

View File

@@ -226,7 +226,7 @@ impl Clock for UringIO {
/// use the callback pointer as the user_data for the operation as is
/// common practice for io_uring to prevent more indirection
fn get_key(c: Completion) -> u64 {
Arc::into_raw(c.inner) as u64
Arc::into_raw(c.inner.clone()) as u64
}
#[inline(always)]
@@ -353,6 +353,17 @@ impl File for UringFile {
fn size(&self) -> Result<u64> {
Ok(self.file.metadata()?.len())
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let mut io = self.io.borrow_mut();
let truncate = with_fd!(self, |fd| {
io_uring::opcode::Ftruncate::new(fd, len as u64)
.build()
.user_data(get_key(c.clone()))
});
io.ring.submit_entry(&truncate);
Ok(c)
}
}
impl Drop for UringFile {

View File

@@ -174,6 +174,19 @@ impl File for MemoryFile {
Ok(c)
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
if len < self.size.get() {
// Truncate pages
unsafe {
let pages = &mut *self.pages.get();
pages.retain(|&k, _| k * PAGE_SIZE < len);
}
}
self.size.set(len);
c.complete(0);
Ok(c)
}
fn size(&self) -> Result<u64> {
Ok(self.size.get() as u64)
}

View File

@@ -19,6 +19,7 @@ pub trait File: Send + Sync {
-> Result<Completion>;
fn sync(&self, c: Completion) -> Result<Completion>;
fn size(&self) -> Result<u64>;
fn truncate(&self, len: usize, c: Completion) -> Result<Completion>;
}
#[derive(Debug, Copy, Clone, PartialEq)]
@@ -53,6 +54,7 @@ pub trait IO: Clock + Send + Sync {
pub type Complete = dyn Fn(Arc<RefCell<Buffer>>, i32);
pub type WriteComplete = dyn Fn(i32);
pub type SyncComplete = dyn Fn(i32);
pub type TruncateComplete = dyn Fn(i32);
#[must_use]
#[derive(Clone)]
@@ -69,6 +71,7 @@ pub enum CompletionType {
Read(ReadCompletion),
Write(WriteCompletion),
Sync(SyncCompletion),
Truncate(TruncateCompletion),
}
pub struct ReadCompletion {
@@ -113,6 +116,14 @@ impl Completion {
))))
}
pub fn new_trunc<F>(complete: F) -> Self
where
F: Fn(i32) + 'static,
{
Self::new(CompletionType::Truncate(TruncateCompletion::new(Box::new(
complete,
))))
}
pub fn is_completed(&self) -> bool {
self.inner.is_completed.get()
}
@@ -122,6 +133,7 @@ impl Completion {
CompletionType::Read(r) => r.complete(result),
CompletionType::Write(w) => w.complete(result),
CompletionType::Sync(s) => s.complete(result), // fix
CompletionType::Truncate(t) => t.complete(result),
};
self.inner.is_completed.set(true);
}
@@ -191,6 +203,20 @@ impl SyncCompletion {
}
}
pub struct TruncateCompletion {
pub complete: Box<TruncateComplete>,
}
impl TruncateCompletion {
pub fn new(complete: Box<TruncateComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, res: i32) {
(self.complete)(res);
}
}
pub type BufferData = Pin<Vec<u8>>;
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;

View File

@@ -450,6 +450,22 @@ impl File for UnixFile<'_> {
let file = self.file.lock().unwrap();
Ok(file.metadata()?.len())
}
#[instrument(err, skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let file = self.file.lock().map_err(|e| {
LimboError::LockingError(format!("Failed to lock file for truncation: {e}"))
})?;
let result = file.set_len(len as u64);
match result {
Ok(()) => {
trace!("file truncated to len=({})", len);
c.complete(0);
Ok(c)
}
Err(e) => Err(e.into()),
}
}
}
impl Drop for UnixFile<'_> {

View File

@@ -43,10 +43,8 @@ impl IO for VfsMod {
Ok(())
}
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.is_completed() {
self.run_once()?;
}
fn wait_for_completion(&self, _c: Completion) -> Result<()> {
// for the moment anyway, this is currently a sync api
Ok(())
}
@@ -165,6 +163,20 @@ impl File for VfsFileImpl {
Ok(result as u64)
}
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
if self.vfs.is_null() {
return Err(LimboError::ExtensionError("VFS is null".to_string()));
}
let vfs = unsafe { &*self.vfs };
let result = unsafe { (vfs.truncate)(self.file, len as i64) };
if result.is_error() {
Err(LimboError::ExtensionError("truncate failed".to_string()))
} else {
c.complete(0);
Ok(c)
}
}
}
impl Drop for VfsMod {

View File

@@ -123,6 +123,13 @@ impl File for WindowsFile {
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let file = self.file.write();
file.set_len(len as u64).map_err(LimboError::IOError)?;
c.complete(0);
Ok(c)
}
fn size(&self) -> Result<u64> {
let file = self.file.read();
Ok(file.metadata().unwrap().len())

View File

@@ -103,6 +103,7 @@ pub type Result<T, E = LimboError> = std::result::Result<T, E>;
enum TransactionState {
Write { schema_did_change: bool },
Read,
PendingUpgrade,
None,
}
@@ -311,9 +312,13 @@ impl Database {
db.with_schema_mut(|schema| {
schema.schema_version = get_schema_version(&conn)?;
if let Err(LimboError::ExtensionError(e)) =
schema.make_from_btree(None, pager, &syms)
{
let result = schema
.make_from_btree(None, pager.clone(), &syms)
.or_else(|e| {
pager.end_read_tx()?;
Err(e)
});
if let Err(LimboError::ExtensionError(e)) = result {
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
eprintln!("Warning: {e}");
@@ -1146,7 +1151,9 @@ impl Connection {
result::LimboResult::Busy => return Err(LimboError::Busy),
result::LimboResult::Ok => {}
}
match pager.io.block(|| pager.begin_write_tx())? {
match pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| {
pager.end_read_tx().expect("read txn must be closed");
})? {
result::LimboResult::Busy => {
pager.end_read_tx().expect("read txn must be closed");
return Err(LimboError::Busy);
@@ -1163,12 +1170,13 @@ impl Connection {
{
let pager = self.pager.borrow();
{
let wal = pager.wal.borrow_mut();
wal.end_write_tx();
wal.end_read_tx();
}
// remove all non-commited changes in case if WAL session left some suffix without commit frame
pager.rollback(false, self).expect("rollback must succeed");
let wal = pager.wal.borrow_mut();
wal.end_write_tx();
wal.end_read_tx();
pager.rollback(false, self)?;
}
// let's re-parse schema from scratch if schema cookie changed compared to the our in-memory view of schema
@@ -1188,13 +1196,13 @@ impl Connection {
Ok(())
}
pub fn checkpoint(&self) -> Result<CheckpointResult> {
pub fn checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
self.pager
.borrow()
.wal_checkpoint(self.wal_checkpoint_disabled.get())
.wal_checkpoint(self.wal_checkpoint_disabled.get(), mode)
}
/// Close a connection and checkpoint.
@@ -1206,16 +1214,18 @@ impl Connection {
match self.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
let _result = self.pager.borrow().end_tx(
while let IOResult::IO = self.pager.borrow().end_tx(
true, // rollback = true for close
schema_did_change,
self,
self.wal_checkpoint_disabled.get(),
);
)? {
self.run_once()?;
}
self.transaction_state.set(TransactionState::None);
}
TransactionState::Read => {
let _result = self.pager.borrow().end_read_tx();
TransactionState::PendingUpgrade | TransactionState::Read => {
self.pager.borrow().end_read_tx()?;
self.transaction_state.set(TransactionState::None);
}
TransactionState::None => {
@@ -1719,13 +1729,6 @@ impl Statement {
if res.is_err() {
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
if let Err(e) = self
.pager
.rollback(schema_did_change, &self.program.connection)
{
// Let's panic for now as we don't want to leave state in a bad state.
panic!("rollback failed: {e:?}");
}
let end_tx_res =
self.pager
.end_tx(true, schema_did_change, &self.program.connection, true)?;

View File

@@ -1,4 +1,5 @@
/// Common results that different functions can return in limbo.
#[derive(Debug)]
pub enum LimboResult {
/// Couldn't acquire a lock
Busy,

View File

@@ -18,6 +18,7 @@ pub trait DatabaseStorage: Send + Sync {
) -> Result<Completion>;
fn sync(&self, c: Completion) -> Result<Completion>;
fn size(&self) -> Result<u64>;
fn truncate(&self, len: usize, c: Completion) -> Result<Completion>;
}
#[cfg(feature = "fs")]
@@ -69,6 +70,12 @@ impl DatabaseStorage for DatabaseFile {
fn size(&self) -> Result<u64> {
self.file.size()
}
#[instrument(skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let c = self.file.truncate(len, c)?;
Ok(c)
}
}
#[cfg(feature = "fs")]
@@ -122,6 +129,12 @@ impl DatabaseStorage for FileMemoryStorage {
fn size(&self) -> Result<u64> {
self.file.size()
}
#[instrument(skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let c = self.file.truncate(len, c)?;
Ok(c)
}
}
impl FileMemoryStorage {

View File

@@ -4,12 +4,12 @@ use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::header_accessor;
use crate::storage::sqlite3_ondisk::{
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType,
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType, DEFAULT_PAGE_SIZE,
};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::{IOResult, WalInsertInfo};
use crate::util::IOExt as _;
use crate::{return_if_io, Completion};
use crate::{return_if_io, Completion, TransactionState};
use crate::{turso_assert, Buffer, Connection, LimboError, Result};
use parking_lot::RwLock;
use std::cell::{Cell, OnceCell, RefCell, UnsafeCell};
@@ -351,7 +351,7 @@ pub struct Pager {
free_page_state: RefCell<FreePageState>,
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Clone)]
/// The status of the current cache flush.
pub enum PagerCommitResult {
/// The WAL was written to disk and fsynced.
@@ -818,8 +818,14 @@ impl Pager {
) -> Result<IOResult<PagerCommitResult>> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
self.wal.borrow().end_write_tx();
if matches!(
connection.transaction_state.get(),
TransactionState::Write { .. }
) {
self.wal.borrow().end_write_tx();
}
self.wal.borrow().end_read_tx();
self.rollback(schema_did_change, connection)?;
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?;
@@ -1283,26 +1289,51 @@ impl Pager {
_attempts += 1;
}
}
self.wal_checkpoint(wal_checkpoint_disabled)?;
self.wal_checkpoint(wal_checkpoint_disabled, CheckpointMode::Passive)?;
Ok(())
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_checkpoint(&self, wal_checkpoint_disabled: bool) -> Result<CheckpointResult> {
pub fn wal_checkpoint(
&self,
wal_checkpoint_disabled: bool,
mode: CheckpointMode,
) -> Result<CheckpointResult> {
if wal_checkpoint_disabled {
return Ok(CheckpointResult {
num_wal_frames: 0,
num_checkpointed_frames: 0,
});
return Ok(CheckpointResult::default());
}
let checkpoint_result = self.io.block(|| {
let counter = Rc::new(RefCell::new(0));
let mut checkpoint_result = self.io.block(|| {
self.wal
.borrow_mut()
.checkpoint(self, Rc::new(RefCell::new(0)), CheckpointMode::Passive)
.map_err(|err| panic!("error while clearing cache {err}"))
.checkpoint(self, counter.clone(), mode)
})?;
if checkpoint_result.everything_backfilled()
&& checkpoint_result.num_checkpointed_frames != 0
{
let db_size = header_accessor::get_database_size(self)?;
let page_size = self.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE);
let expected = (db_size * page_size) as u64;
if expected < self.db_file.size()? {
self.io.wait_for_completion(self.db_file.truncate(
expected as usize,
Completion::new_trunc(move |_| {
tracing::trace!(
"Database file truncated to expected size: {} bytes",
expected
);
}),
)?)?;
self.io
.wait_for_completion(self.db_file.sync(Completion::new_sync(move |_| {
tracing::trace!("Database file syncd after truncation");
}))?)?;
}
checkpoint_result.release_guard();
}
// TODO: only clear cache of things that are really invalidated
self.page_cache
.write()

View File

@@ -1695,7 +1695,8 @@ pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<
};
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_write(write_complete);
io.pwrite(0, buffer.clone(), c)
let c = io.pwrite(0, buffer.clone(), c.clone())?;
Ok(c)
}
/// Checks if payload will overflow a cell based on the maximum allowed size.

File diff suppressed because it is too large Load Diff

View File

@@ -344,12 +344,6 @@ fn query_pragma(
_ => CheckpointMode::Passive,
};
if !matches!(mode, CheckpointMode::Passive) {
return Err(LimboError::ParseError(
"only Passive mode supported".to_string(),
));
}
program.alloc_registers(2);
program.emit_insn(Insn::Checkpoint {
database: 0,

View File

@@ -62,7 +62,10 @@ use crate::{
vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract},
};
use crate::{info, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult, TransactionState};
use crate::{
info, turso_assert, BufferPool, MvCursor, OpenFlags, RefValue, Row, StepResult,
TransactionState,
};
use super::{
insn::{Cookie, RegisterOrLiteral},
@@ -326,17 +329,18 @@ pub fn op_checkpoint(
) -> Result<InsnFunctionStepResult> {
let Insn::Checkpoint {
database: _,
checkpoint_mode: _,
checkpoint_mode,
dest,
} = insn
else {
unreachable!("unexpected Insn {:?}", insn)
};
let result = program.connection.checkpoint();
let result = program.connection.checkpoint(*checkpoint_mode);
match result {
Ok(CheckpointResult {
num_wal_frames: num_wal_pages,
num_checkpointed_frames: num_checkpointed_pages,
..
}) => {
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
@@ -1982,6 +1986,20 @@ pub fn op_transaction(
} else {
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
@@ -2003,7 +2021,6 @@ pub fn op_transaction(
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(InsnFunctionStepResult::Busy);
@@ -2015,11 +2032,18 @@ pub fn op_transaction(
IOResult::Done(r) => {
if let LimboResult::Busy = r {
pager.end_read_tx()?;
conn.transaction_state.replace(TransactionState::None);
conn.auto_commit.replace(true);
return Ok(InsnFunctionStepResult::Busy);
}
}
IOResult::IO => {
pager.end_read_tx()?;
// set the transaction state to pending so we don't have to
// end the read transaction.
program
.connection
.transaction_state
.replace(TransactionState::PendingUpgrade);
return Ok(InsnFunctionStepResult::IO);
}
}
@@ -2062,7 +2086,8 @@ pub fn op_auto_commit(
if *auto_commit != conn.auto_commit.get() {
if *rollback {
// TODO(pere): add rollback I/O logic once we implement rollback journal
pager.rollback(schema_did_change, &conn)?;
return_if_io!(pager.end_tx(true, schema_did_change, &conn, false));
conn.transaction_state.replace(TransactionState::None);
conn.auto_commit.replace(true);
} else {
conn.auto_commit.replace(*auto_commit);
@@ -6101,6 +6126,7 @@ pub fn op_set_cookie(
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
}
program
@@ -6325,6 +6351,9 @@ pub fn op_open_ephemeral(
}
OpOpenEphemeralState::StartingTxn { pager } => {
tracing::trace!("StartingTxn");
pager
.begin_read_tx() // we have to begin a read tx before beginning a write
.expect("Failed to start read transaction");
return_if_io!(pager.begin_write_tx());
state.op_open_ephemeral_state = OpOpenEphemeralState::CreateBtree {
pager: pager.clone(),

View File

@@ -27,7 +27,7 @@ pub mod sorter;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
storage::{pager, sqlite3_ondisk::SmallVec},
storage::sqlite3_ondisk::SmallVec,
translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef},
vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState},
@@ -398,7 +398,10 @@ impl Program {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
pager.rollback(schema_did_change, &self.connection)?
match pager.end_tx(true, schema_did_change, &self.connection, false)? {
IOResult::IO => return Ok(StepResult::IO),
IOResult::Done(_) => {}
}
}
return Err(LimboError::InternalError("Connection closed".to_string()));
}
@@ -481,6 +484,9 @@ impl Program {
Ok(StepResult::Done)
}
TransactionState::None => Ok(StepResult::Done),
TransactionState::PendingUpgrade => {
panic!("Unexpected transaction state: {current_state:?} during auto-commit",)
}
}
} else {
if self.change_cnt_on {
@@ -507,13 +513,10 @@ impl Program {
connection.wal_checkpoint_disabled.get(),
)?;
match cacheflush_status {
IOResult::Done(status) => {
IOResult::Done(_) => {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
if matches!(status, pager::PagerCommitResult::Rollback) {
pager.rollback(schema_did_change, connection)?;
}
connection.transaction_state.replace(TransactionState::None);
*commit_state = CommitState::Ready;
}
@@ -758,11 +761,15 @@ pub fn handle_program_error(
_ => {
let state = connection.transaction_state.get();
if let TransactionState::Write { schema_did_change } = state {
if let Err(e) = pager.rollback(schema_did_change, connection) {
tracing::error!("rollback failed: {e}");
}
if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) {
tracing::error!("end_tx failed: {e}");
loop {
match pager.end_tx(true, schema_did_change, connection, false) {
Ok(IOResult::IO) => connection.run_once()?,
Ok(IOResult::Done(_)) => break,
Err(e) => {
tracing::error!("end_tx failed: {e}");
break;
}
}
}
} else if let Err(e) = pager.end_read_tx() {
tracing::error!("end_read_tx failed: {e}");

View File

@@ -41,6 +41,7 @@ pub trait VfsFile: Send + Sync {
fn read(&mut self, buf: &mut [u8], count: usize, offset: i64) -> ExtResult<i32>;
fn write(&mut self, buf: &[u8], count: usize, offset: i64) -> ExtResult<i32>;
fn sync(&self) -> ExtResult<()>;
fn truncate(&self, len: i64) -> ExtResult<()>;
fn size(&self) -> i64;
}
@@ -59,6 +60,7 @@ pub struct VfsImpl {
pub run_once: VfsRunOnce,
pub current_time: VfsGetCurrentTime,
pub gen_random_number: VfsGenerateRandomNumber,
pub truncate: VfsTruncate,
}
pub type RegisterVfsFn =
@@ -81,6 +83,8 @@ pub type VfsWrite =
pub type VfsSync = unsafe extern "C" fn(file: *const c_void) -> i32;
pub type VfsTruncate = unsafe extern "C" fn(file: *const c_void, len: i64) -> ResultCode;
pub type VfsLock = unsafe extern "C" fn(file: *const c_void, exclusive: bool) -> ResultCode;
pub type VfsUnlock = unsafe extern "C" fn(file: *const c_void) -> ResultCode;

View File

@@ -314,6 +314,11 @@ impl VfsFile for TestFile {
self.file.sync_all().map_err(|_| ResultCode::Error)
}
fn truncate(&self, len: i64) -> ExtResult<()> {
log::debug!("truncating file with testing VFS to length: {len}");
self.file.set_len(len as u64).map_err(|_| ResultCode::Error)
}
fn size(&self) -> i64 {
self.file.metadata().map(|m| m.len() as i64).unwrap_or(-1)
}

View File

@@ -11,6 +11,7 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
let close_fn_name = format_ident!("{}_close", struct_name);
let read_fn_name = format_ident!("{}_read", struct_name);
let write_fn_name = format_ident!("{}_write", struct_name);
let trunc_fn_name = format_ident!("{}_truncate", struct_name);
let lock_fn_name = format_ident!("{}_lock", struct_name);
let unlock_fn_name = format_ident!("{}_unlock", struct_name);
let sync_fn_name = format_ident!("{}_sync", struct_name);
@@ -36,6 +37,7 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
unlock: #unlock_fn_name,
sync: #sync_fn_name,
size: #size_fn_name,
truncate: #trunc_fn_name,
run_once: #run_once_fn_name,
gen_random_number: #generate_random_number_fn_name,
current_time: #get_current_time_fn_name,
@@ -59,6 +61,7 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
unlock: #unlock_fn_name,
sync: #sync_fn_name,
size: #size_fn_name,
truncate: #trunc_fn_name,
run_once: #run_once_fn_name,
gen_random_number: #generate_random_number_fn_name,
current_time: #get_current_time_fn_name,
@@ -188,6 +191,20 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
0
}
#[no_mangle]
pub unsafe extern "C" fn #trunc_fn_name(file_ptr: *const ::std::ffi::c_void, len: i64) -> ::turso_ext::ResultCode {
if file_ptr.is_null() {
return ::turso_ext::ResultCode::Error;
}
let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl);
let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File =
&mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File);
if <#struct_name as ::turso_ext::VfsExtension>::File::truncate(file, len).is_err() {
return ::turso_ext::ResultCode::Error;
}
::turso_ext::ResultCode::OK
}
#[no_mangle]
pub unsafe extern "C" fn #size_fn_name(file_ptr: *const ::std::ffi::c_void) -> i64 {
if file_ptr.is_null() {

View File

@@ -225,6 +225,25 @@ impl File for SimulatorFile {
fn size(&self) -> Result<u64> {
self.inner.size()
}
fn truncate(&self, len: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
if self.fault.get() {
return Err(turso_core::LimboError::InternalError(
FAULT_ERROR_MSG.into(),
));
}
let c = if let Some(latency) = self.generate_latency_duration() {
let cloned_c = c.clone();
let op = Box::new(move |file: &SimulatorFile| file.inner.truncate(len, cloned_c));
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
c
} else {
self.inner.truncate(len, c)?
};
Ok(c)
}
}
impl Drop for SimulatorFile {

View File

@@ -3,7 +3,7 @@
use std::ffi::{self, CStr, CString};
use tracing::trace;
use turso_core::{LimboError, Value};
use turso_core::{CheckpointMode, LimboError, Value};
use std::sync::{Arc, Mutex};
@@ -1108,20 +1108,41 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint(
pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2(
db: *mut sqlite3,
_db_name: *const ffi::c_char,
_mode: ffi::c_int,
_log_size: *mut ffi::c_int,
_checkpoint_count: *mut ffi::c_int,
mode: ffi::c_int,
log_size: *mut ffi::c_int,
checkpoint_count: *mut ffi::c_int,
) -> ffi::c_int {
if db.is_null() {
return SQLITE_MISUSE;
}
let db: &mut sqlite3 = &mut *db;
let db = db.inner.lock().unwrap();
// TODO: Checkpointing modes and reporting back log size and checkpoint count to caller.
if db.conn.checkpoint().is_err() {
return SQLITE_ERROR;
let chkptmode = match mode {
SQLITE_CHECKPOINT_PASSIVE => CheckpointMode::Passive,
SQLITE_CHECKPOINT_RESTART => CheckpointMode::Restart,
SQLITE_CHECKPOINT_TRUNCATE => CheckpointMode::Truncate,
SQLITE_CHECKPOINT_FULL => CheckpointMode::Full,
_ => return SQLITE_MISUSE, // Unsupported mode
};
match db.conn.checkpoint(chkptmode) {
Ok(res) => {
if !log_size.is_null() {
(*log_size) = res.num_wal_frames as ffi::c_int;
}
if !checkpoint_count.is_null() {
(*checkpoint_count) = res.num_checkpointed_frames as ffi::c_int;
}
SQLITE_OK
}
Err(e) => {
println!("Checkpoint error: {e}");
if matches!(e, turso_core::LimboError::Busy) {
SQLITE_BUSY
} else {
SQLITE_ERROR
}
}
}
SQLITE_OK
}
/// Get the number of frames in the WAL.

View File

@@ -167,16 +167,17 @@ mod tests {
SQLITE_OK
);
assert_eq!(
sqlite3_wal_checkpoint_v2(
db,
ptr::null(),
SQLITE_CHECKPOINT_FULL,
&mut log_size,
&mut checkpoint_count
),
SQLITE_OK
);
// TODO: uncomment when SQLITE_CHECKPOINT_FULL is supported
// assert_eq!(
// sqlite3_wal_checkpoint_v2(
// db,
// ptr::null(),
// SQLITE_CHECKPOINT_FULL,
// &mut log_size,
// &mut checkpoint_count
// ),
// SQLITE_OK
// );
assert_eq!(
sqlite3_wal_checkpoint_v2(

View File

@@ -175,6 +175,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() {
assert_eq!(conn1.wal_frame_count().unwrap(), 14);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
// Intentionally leave out the final commit frame, so the big randomblob is not committed and should not be visible to transactions.
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();

View File

@@ -3,7 +3,9 @@ use crate::common::{compare_string, do_flush, TempDatabase};
use log::debug;
use std::io::{Read, Seek, Write};
use std::sync::Arc;
use turso_core::{Connection, Database, LimboError, Row, Statement, StepResult, Value};
use turso_core::{
CheckpointMode, Connection, Database, LimboError, Row, Statement, StepResult, Value,
};
const WAL_HEADER_SIZE: usize = 32;
const WAL_FRAME_HEADER_SIZE: usize = 24;
@@ -285,7 +287,7 @@ fn test_wal_checkpoint() -> anyhow::Result<()> {
for i in 0..iterations {
let insert_query = format!("INSERT INTO test VALUES ({i})");
do_flush(&conn, &tmp_db)?;
conn.checkpoint()?;
conn.checkpoint(CheckpointMode::Passive)?;
run_query(&tmp_db, &conn, &insert_query)?;
}