mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-09 11:14:20 +01:00
Merge 'core: Initial pass on synchronous pragma' from Pekka Enberg
This adds support for "OFF" and "FULL" (default) synchronous modes. As future work, we need to add NORMAL and EXTRA as well because applications expect them. Closes #2833
This commit is contained in:
@@ -160,7 +160,7 @@ Turso aims to be fully compatible with SQLite, with opt-in features not supporte
|
||||
| PRAGMA shrink_memory | No | |
|
||||
| PRAGMA soft_heap_limit | No | |
|
||||
| PRAGMA stats | No | Used for testing in SQLite |
|
||||
| PRAGMA synchronous | No | |
|
||||
| PRAGMA synchronous | Partial | `OFF` and `FULL` supported |
|
||||
| PRAGMA table_info | Yes | |
|
||||
| PRAGMA table_list | No | |
|
||||
| PRAGMA table_xinfo | No | |
|
||||
|
||||
21
core/lib.rs
21
core/lib.rs
@@ -109,6 +109,12 @@ enum TransactionState {
|
||||
None,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum SyncMode {
|
||||
Off = 0,
|
||||
Full = 2,
|
||||
}
|
||||
|
||||
pub(crate) type MvStore = mvcc::MvStore<mvcc::LocalClock>;
|
||||
|
||||
pub(crate) type MvCursor = mvcc::cursor::MvccLazyCursor<mvcc::LocalClock>;
|
||||
@@ -466,6 +472,7 @@ impl Database {
|
||||
is_nested_stmt: Cell::new(false),
|
||||
encryption_key: RefCell::new(None),
|
||||
encryption_cipher_mode: Cell::new(None),
|
||||
sync_mode: Cell::new(SyncMode::Full),
|
||||
});
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -900,6 +907,7 @@ pub struct Connection {
|
||||
is_nested_stmt: Cell<bool>,
|
||||
encryption_key: RefCell<Option<EncryptionKey>>,
|
||||
encryption_cipher_mode: Cell<Option<CipherMode>>,
|
||||
sync_mode: Cell<SyncMode>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
@@ -1459,7 +1467,10 @@ impl Connection {
|
||||
};
|
||||
|
||||
let commit_err = if force_commit {
|
||||
pager.io.block(|| pager.commit_dirty_pages(true)).err()
|
||||
pager
|
||||
.io
|
||||
.block(|| pager.commit_dirty_pages(true, self.get_sync_mode()))
|
||||
.err()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -1982,6 +1993,14 @@ impl Connection {
|
||||
self.query_only.set(value);
|
||||
}
|
||||
|
||||
pub fn get_sync_mode(&self) -> SyncMode {
|
||||
self.sync_mode.get()
|
||||
}
|
||||
|
||||
pub fn set_sync_mode(&self, mode: SyncMode) {
|
||||
self.sync_mode.set(mode);
|
||||
}
|
||||
|
||||
/// Creates a HashSet of modules that have been loaded
|
||||
pub fn get_syms_vtab_mods(&self) -> std::collections::HashSet<String> {
|
||||
self.syms.borrow().vtab_modules.keys().cloned().collect()
|
||||
|
||||
@@ -81,6 +81,10 @@ pub fn pragma_for(pragma: &PragmaName) -> Pragma {
|
||||
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
|
||||
&["schema_version"],
|
||||
),
|
||||
Synchronous => Pragma::new(
|
||||
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
|
||||
&["synchronous"],
|
||||
),
|
||||
TableInfo => Pragma::new(
|
||||
PragmaFlags::NeedSchema | PragmaFlags::Result1 | PragmaFlags::SchemaOpt,
|
||||
&["cid", "name", "type", "notnull", "dflt_value", "pk"],
|
||||
|
||||
@@ -1059,8 +1059,10 @@ impl Pager {
|
||||
self.rollback(schema_did_change, connection, is_write)?;
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
}
|
||||
let commit_status =
|
||||
return_if_io!(self.commit_dirty_pages(connection.wal_auto_checkpoint_disabled.get()));
|
||||
let commit_status = return_if_io!(self.commit_dirty_pages(
|
||||
connection.wal_auto_checkpoint_disabled.get(),
|
||||
connection.get_sync_mode()
|
||||
));
|
||||
wal.borrow().end_write_tx();
|
||||
wal.borrow().end_read_tx();
|
||||
|
||||
@@ -1306,6 +1308,7 @@ impl Pager {
|
||||
pub fn commit_dirty_pages(
|
||||
&self,
|
||||
wal_auto_checkpoint_disabled: bool,
|
||||
sync_mode: crate::SyncMode,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
@@ -1382,7 +1385,12 @@ impl Pager {
|
||||
if completions.is_empty() {
|
||||
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
|
||||
} else {
|
||||
self.commit_info.state.set(CommitState::SyncWal);
|
||||
// Skip sync if synchronous mode is OFF
|
||||
if sync_mode == crate::SyncMode::Off {
|
||||
self.commit_info.state.set(CommitState::AfterSyncWal);
|
||||
} else {
|
||||
self.commit_info.state.set(CommitState::SyncWal);
|
||||
}
|
||||
}
|
||||
if !completions.iter().all(|c| c.is_completed()) {
|
||||
io_yield_many!(completions);
|
||||
@@ -1405,7 +1413,12 @@ impl Pager {
|
||||
}
|
||||
CommitState::Checkpoint => {
|
||||
checkpoint_result = return_if_io!(self.checkpoint());
|
||||
self.commit_info.state.set(CommitState::SyncDbFile);
|
||||
// Skip sync if synchronous mode is OFF
|
||||
if sync_mode == crate::SyncMode::Off {
|
||||
self.commit_info.state.set(CommitState::AfterSyncDbFile);
|
||||
} else {
|
||||
self.commit_info.state.set(CommitState::SyncDbFile);
|
||||
}
|
||||
}
|
||||
CommitState::SyncDbFile => {
|
||||
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
|
||||
|
||||
@@ -324,6 +324,27 @@ fn update_pragma(
|
||||
connection.set_encryption_cipher(cipher);
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
PragmaName::Synchronous => {
|
||||
use crate::SyncMode;
|
||||
|
||||
let mode = match value {
|
||||
Expr::Name(name) => {
|
||||
let name_upper = name.as_str().to_uppercase();
|
||||
match name_upper.as_str() {
|
||||
"OFF" | "FALSE" | "NO" | "0" => SyncMode::Off,
|
||||
_ => SyncMode::Full,
|
||||
}
|
||||
}
|
||||
Expr::Literal(Literal::Numeric(n)) => match n.as_str() {
|
||||
"0" => SyncMode::Off,
|
||||
_ => SyncMode::Full,
|
||||
},
|
||||
_ => SyncMode::Full,
|
||||
};
|
||||
|
||||
connection.set_sync_mode(mode);
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -604,6 +625,14 @@ fn query_pragma(
|
||||
}
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
PragmaName::Synchronous => {
|
||||
let mode = connection.get_sync_mode();
|
||||
let register = program.alloc_register();
|
||||
program.emit_int(mode as i64, register);
|
||||
program.emit_result_row(register, 1);
|
||||
program.add_pragma_result_column(pragma.to_string());
|
||||
Ok((program, TransactionMode::None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1360,6 +1360,8 @@ pub enum PragmaName {
|
||||
QueryOnly,
|
||||
/// Returns schema version of the database file.
|
||||
SchemaVersion,
|
||||
/// Control database synchronization mode (OFF | FULL | NORMAL | EXTRA)
|
||||
Synchronous,
|
||||
/// returns information about the columns of a table
|
||||
TableInfo,
|
||||
/// enable capture-changes logic for the connection
|
||||
|
||||
Reference in New Issue
Block a user