mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-19 06:55:18 +01:00
Only checkpoint final remaining DB connection, and use Truncate mode
This commit is contained in:
37
core/lib.rs
37
core/lib.rs
@@ -71,7 +71,7 @@ use std::{
|
||||
num::NonZero,
|
||||
ops::Deref,
|
||||
rc::Rc,
|
||||
sync::{Arc, LazyLock, Mutex, Weak},
|
||||
sync::{atomic::AtomicUsize, Arc, LazyLock, Mutex, Weak},
|
||||
};
|
||||
#[cfg(feature = "fs")]
|
||||
use storage::database::DatabaseFile;
|
||||
@@ -137,6 +137,7 @@ pub struct Database {
|
||||
open_flags: OpenFlags,
|
||||
builtin_syms: RefCell<SymbolTable>,
|
||||
experimental_views: bool,
|
||||
n_connections: AtomicUsize,
|
||||
}
|
||||
|
||||
unsafe impl Send for Database {}
|
||||
@@ -185,6 +186,12 @@ impl fmt::Debug for Database {
|
||||
};
|
||||
debug_struct.field("page_cache", &cache_info);
|
||||
|
||||
debug_struct.field(
|
||||
"n_connections",
|
||||
&self
|
||||
.n_connections
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
);
|
||||
debug_struct.finish()
|
||||
}
|
||||
}
|
||||
@@ -372,6 +379,7 @@ impl Database {
|
||||
init_lock: Arc::new(Mutex::new(())),
|
||||
experimental_views: enable_views,
|
||||
buffer_pool: BufferPool::begin_init(&io, arena_size),
|
||||
n_connections: AtomicUsize::new(0),
|
||||
});
|
||||
db.register_global_builtin_extensions()
|
||||
.expect("unable to register global extensions");
|
||||
@@ -425,6 +433,8 @@ impl Database {
|
||||
.unwrap_or_default()
|
||||
.get();
|
||||
|
||||
self.n_connections
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let conn = Arc::new(Connection {
|
||||
_db: self.clone(),
|
||||
pager: RefCell::new(Rc::new(pager)),
|
||||
@@ -888,6 +898,17 @@ pub struct Connection {
|
||||
encryption_key: RefCell<Option<EncryptionKey>>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
fn drop(&mut self) {
|
||||
if !self.closed.get() {
|
||||
// if connection wasn't properly closed, decrement the connection counter
|
||||
self._db
|
||||
.n_connections
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
|
||||
@@ -1506,9 +1527,17 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
self.pager
|
||||
.borrow()
|
||||
.checkpoint_shutdown(self.wal_auto_checkpoint_disabled.get())
|
||||
if self
|
||||
._db
|
||||
.n_connections
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed)
|
||||
.eq(&1)
|
||||
{
|
||||
self.pager
|
||||
.borrow()
|
||||
.checkpoint_shutdown(self.wal_auto_checkpoint_disabled.get());
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn wal_auto_checkpoint_disable(&self) {
|
||||
|
||||
@@ -1555,8 +1555,18 @@ impl Pager {
|
||||
.expect("Failed to clear page cache");
|
||||
}
|
||||
|
||||
/// Checkpoint in Truncate mode and delete the WAL file. This method is _only_ to be called
|
||||
/// for shutting down the last remaining connection to a database.
|
||||
///
|
||||
/// sqlite3.h
|
||||
/// Usually, when a database in [WAL mode] is closed or detached from a
|
||||
/// database handle, SQLite checks if if there are other connections to the
|
||||
/// same database, and if there are no other database connection (if the
|
||||
/// connection being closed is the last open connection to the database),
|
||||
/// then SQLite performs a [checkpoint] before closing the connection and
|
||||
/// deletes the WAL file.
|
||||
pub fn checkpoint_shutdown(&self, wal_auto_checkpoint_disabled: bool) -> Result<()> {
|
||||
let mut _attempts = 0;
|
||||
let mut attempts = 0;
|
||||
{
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
return Err(LimboError::InternalError(
|
||||
@@ -1565,16 +1575,22 @@ impl Pager {
|
||||
};
|
||||
let mut wal = wal.borrow_mut();
|
||||
// fsync the wal syncronously before beginning checkpoint
|
||||
// TODO: for now forget about timeouts as they fail regularly in SIM
|
||||
// need to think of a better way to do this
|
||||
let c = wal.sync()?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
}
|
||||
if !wal_auto_checkpoint_disabled {
|
||||
self.wal_checkpoint(CheckpointMode::Passive {
|
||||
while let Err(LimboError::Busy) = self.wal_checkpoint(CheckpointMode::Truncate {
|
||||
upper_bound_inclusive: None,
|
||||
})?;
|
||||
}) {
|
||||
if attempts == 3 {
|
||||
// don't return error on `close` if we are unable to checkpoint, we can
|
||||
// silently fail
|
||||
return Ok(());
|
||||
}
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
// TODO: delete the WAL file here after truncate checkpoint
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user