mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-28 05:24:22 +01:00
Merge 'core/db&pager: fix locking for initializing empty database' from Jussi Saurio
When `struct Database` is constructed, store `is_empty` as an `Arc<AtomicBool>` - the value is true if: 1. DB size is zero 2. WAL has no frames When `struct Pager` is constructed, this `Arc` is simply cloned. When any connection runs a transaction it will first check `Pager::is_empty`, and if the DB is empty, it will lock `init_lock` and then check `is_empty` again, and if it's still true, it allocates page1 and stores `false` in the `is_empty` `AtomicBool` and drops the lock. --- Note that Limbo can currently have a zero DB and a WAL with frames, as we have no special logic for folding page1 to the main DB file during initialization. Page 1 allocation currently happens on the first transaction (read or write, due to having to support `select * from sqlite_schema` on an empty DB; we should really check how SQLite actually does this.). Closes #1830
This commit is contained in:
16
core/lib.rs
16
core/lib.rs
@@ -51,7 +51,7 @@ pub use io::{
|
||||
use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
|
||||
use parking_lot::RwLock;
|
||||
use schema::Schema;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
@@ -105,6 +105,7 @@ pub struct Database {
|
||||
// create DB connections.
|
||||
_shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
maybe_shared_wal: RwLock<Option<Arc<UnsafeCell<WalFileShared>>>>,
|
||||
is_empty: Arc<AtomicBool>,
|
||||
open_flags: OpenFlags,
|
||||
}
|
||||
|
||||
@@ -163,6 +164,8 @@ impl Database {
|
||||
unsafe { &*wal.get() }.max_frame.load(Ordering::SeqCst) > 0
|
||||
});
|
||||
|
||||
let is_empty = db_size == 0 && !wal_has_frames;
|
||||
|
||||
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
let schema = Arc::new(RwLock::new(Schema::new()));
|
||||
let db = Database {
|
||||
@@ -174,11 +177,12 @@ impl Database {
|
||||
db_file,
|
||||
io: io.clone(),
|
||||
open_flags: flags,
|
||||
is_empty: Arc::new(AtomicBool::new(is_empty)),
|
||||
};
|
||||
let db = Arc::new(db);
|
||||
|
||||
// Check: https://github.com/tursodatabase/limbo/pull/1761#discussion_r2154013123
|
||||
if db_size > 0 || wal_has_frames {
|
||||
if !is_empty {
|
||||
// parse schema
|
||||
let conn = db.connect()?;
|
||||
let rows = conn.query("SELECT * FROM sqlite_schema")?;
|
||||
@@ -203,11 +207,7 @@ impl Database {
|
||||
// Open existing WAL file if present
|
||||
if let Some(shared_wal) = self.maybe_shared_wal.read().clone() {
|
||||
// No pages in DB file or WAL -> empty database
|
||||
let is_empty = self.db_file.size()? == 0
|
||||
&& unsafe { &*shared_wal.get() }
|
||||
.max_frame
|
||||
.load(Ordering::SeqCst)
|
||||
== 0;
|
||||
let is_empty = self.is_empty.clone();
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
self.io.clone(),
|
||||
shared_wal,
|
||||
@@ -251,7 +251,7 @@ impl Database {
|
||||
// No existing WAL; create one.
|
||||
// TODO: currently Pager needs to be instantiated with some implementation of trait Wal, so here's a workaround.
|
||||
let dummy_wal = Rc::new(RefCell::new(DummyWAL {}));
|
||||
let is_empty = self.db_file.size()? == 0;
|
||||
let is_empty = self.is_empty.clone();
|
||||
let mut pager = Pager::new(
|
||||
self.db_file.clone(),
|
||||
dummy_wal,
|
||||
|
||||
@@ -6525,7 +6525,13 @@ mod tests {
|
||||
BufferPool, Connection, StepResult, WalFile, WalFileShared, WriteCompletion,
|
||||
};
|
||||
use std::{
|
||||
cell::RefCell, collections::HashSet, mem::transmute, ops::Deref, panic, rc::Rc, sync::Arc,
|
||||
cell::RefCell,
|
||||
collections::HashSet,
|
||||
mem::transmute,
|
||||
ops::Deref,
|
||||
panic,
|
||||
rc::Rc,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
};
|
||||
|
||||
use tempfile::TempDir;
|
||||
@@ -6870,7 +6876,17 @@ mod tests {
|
||||
let wal = Rc::new(RefCell::new(wal_file));
|
||||
|
||||
let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(2000)));
|
||||
let pager = { Pager::new(db_file, wal, io, page_cache, buffer_pool, true).unwrap() };
|
||||
let pager = {
|
||||
Pager::new(
|
||||
db_file,
|
||||
wal,
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
)
|
||||
.unwrap()
|
||||
};
|
||||
let pager = Rc::new(pager);
|
||||
// FIXME: handle page cache is full
|
||||
pager.allocate_page1().unwrap();
|
||||
@@ -7392,7 +7408,7 @@ mod tests {
|
||||
io,
|
||||
Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))),
|
||||
buffer_pool,
|
||||
true,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
@@ -213,7 +213,7 @@ pub struct Pager {
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
auto_vacuum_mode: RefCell<AutoVacuumMode>,
|
||||
/// Is the db empty? This is signified by 0-sized database and nonexistent WAL.
|
||||
pub is_empty: AtomicBool,
|
||||
pub is_empty: Arc<AtomicBool>,
|
||||
/// Mutex for synchronizing database initialization to prevent race conditions
|
||||
init_lock: Mutex<()>,
|
||||
}
|
||||
@@ -243,7 +243,7 @@ impl Pager {
|
||||
io: Arc<dyn crate::io::IO>,
|
||||
page_cache: Arc<RwLock<DumbLruPageCache>>,
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
is_empty: bool,
|
||||
is_empty: Arc<AtomicBool>,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
db_file,
|
||||
@@ -260,7 +260,7 @@ impl Pager {
|
||||
checkpoint_inflight: Rc::new(RefCell::new(0)),
|
||||
buffer_pool,
|
||||
auto_vacuum_mode: RefCell::new(AutoVacuumMode::None),
|
||||
is_empty: AtomicBool::new(is_empty),
|
||||
is_empty,
|
||||
init_lock: Mutex::new(()),
|
||||
})
|
||||
}
|
||||
@@ -554,10 +554,11 @@ impl Pager {
|
||||
#[inline(always)]
|
||||
pub fn begin_read_tx(&self) -> Result<LimboResult> {
|
||||
// We allocate the first page lazily in the first transaction
|
||||
// Use a loop similar to SQLite's btreeBeginTrans to handle concurrent initialization
|
||||
while self.is_empty.load(Ordering::SeqCst) {
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
let _lock = self.init_lock.lock().unwrap();
|
||||
self.allocate_page1()?;
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
self.allocate_page1()?;
|
||||
}
|
||||
}
|
||||
self.wal.borrow_mut().begin_read_tx()
|
||||
}
|
||||
@@ -566,9 +567,11 @@ impl Pager {
|
||||
pub fn begin_write_tx(&self) -> Result<LimboResult> {
|
||||
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
|
||||
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
|
||||
while self.is_empty.load(Ordering::SeqCst) {
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
let _lock = self.init_lock.lock().unwrap();
|
||||
self.allocate_page1()?;
|
||||
if self.is_empty.load(Ordering::SeqCst) {
|
||||
self.allocate_page1()?;
|
||||
}
|
||||
}
|
||||
self.wal.borrow_mut().begin_write_tx()
|
||||
}
|
||||
@@ -1352,7 +1355,15 @@ mod ptrmap_tests {
|
||||
buffer_pool.clone(),
|
||||
)));
|
||||
|
||||
let pager = Pager::new(db_file, wal, io, page_cache, buffer_pool, true).unwrap();
|
||||
let pager = Pager::new(
|
||||
db_file,
|
||||
wal,
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
)
|
||||
.unwrap();
|
||||
pager.allocate_page1().unwrap();
|
||||
header_accessor::set_vacuum_mode_largest_root_page(&pager, 1).unwrap();
|
||||
pager.set_auto_vacuum_mode(AutoVacuumMode::Full);
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::{
|
||||
},
|
||||
types::compare_immutable,
|
||||
};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::{borrow::BorrowMut, rc::Rc, sync::Arc};
|
||||
|
||||
use crate::{pseudo::PseudoCursor, result::LimboResult};
|
||||
@@ -33,7 +34,6 @@ use crate::{
|
||||
schema::{affinity, Affinity},
|
||||
storage::btree::{BTreeCursor, BTreeKey},
|
||||
};
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::{
|
||||
storage::wal::CheckpointResult,
|
||||
@@ -1684,11 +1684,6 @@ pub fn op_transaction(
|
||||
return Err(LimboError::ReadOnly);
|
||||
}
|
||||
|
||||
// We allocate the first page lazily in the first transaction
|
||||
if conn.pager.is_empty.load(Ordering::SeqCst) {
|
||||
conn.pager.allocate_page1()?;
|
||||
}
|
||||
|
||||
if let Some(mv_store) = &mv_store {
|
||||
if state.mv_tx_id.is_none() {
|
||||
let tx_id = mv_store.begin_tx();
|
||||
@@ -5216,7 +5211,7 @@ pub fn op_open_ephemeral(
|
||||
io,
|
||||
page_cache,
|
||||
buffer_pool.clone(),
|
||||
true,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
)?);
|
||||
|
||||
let page_size = header_accessor::get_page_size(&pager)
|
||||
|
||||
Reference in New Issue
Block a user