Merge 'Fix DumbLruPageCache, Connection::execute and page 1 read from wal' from Pere Diaz Bou

You might have guessed the `DumbLruCache` was dumb, but no, it was a
reference to myself. This PR fixes some dumb mistakes of detaching nodes
which failed to update `tail` and `head` properly.
Edit: This PR now fixes `Connection::execute` which funnily enough we
didn't notice it was returning successfully but in reality
`program.step` returned `IO` which made us think it was finished but not
really.
Edit 2: This PR fixes yet another thing! Pages read from WAL where being
initialized as if they were `id > 1`. By making it not this dumb thing
we can read `page 1` from `WAL` without failing catastrophically

Closes #1115
This commit is contained in:
Pekka Enberg
2025-03-12 18:50:48 +02:00
11 changed files with 275 additions and 69 deletions

18
Cargo.lock generated
View File

@@ -48,6 +48,12 @@ dependencies = [
"equator",
]
[[package]]
name = "allocator-api2"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "anarchist-readable-name-generator-lib"
version = "0.1.2"
@@ -1083,6 +1089,8 @@ version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
]
@@ -1658,6 +1666,7 @@ dependencies = [
"limbo_sqlite3_parser",
"limbo_time",
"limbo_uuid",
"lru",
"miette",
"mimalloc",
"parking_lot",
@@ -1887,6 +1896,15 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "lru"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465"
dependencies = [
"hashbrown",
]
[[package]]
name = "matchers"
version = "0.1.0"

View File

@@ -97,6 +97,7 @@ rand = "0.8.5" # Required for quickcheck
rand_chacha = "0.9.0"
env_logger = "0.11.6"
test-log = { version = "0.2.17", features = ["trace"] }
lru = "0.13.0"
[[bench]]
name = "benchmark"

View File

@@ -387,6 +387,8 @@ impl Connection {
QueryRunner::new(self, sql)
}
/// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish.
/// TODO: make this api async
pub fn execute(self: &Rc<Connection>, sql: impl AsRef<str>) -> Result<()> {
let sql = sql.as_ref();
let mut parser = Parser::new(sql.as_bytes());
@@ -428,7 +430,17 @@ impl Connection {
let mut state =
vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
program.step(&mut state, self._db.mv_store.clone(), self.pager.clone())?;
loop {
let res = program.step(
&mut state,
self._db.mv_store.clone(),
self.pager.clone(),
)?;
if matches!(res, StepResult::Done) {
break;
}
self._db.io.run_once()?;
}
}
}
}

View File

@@ -284,7 +284,7 @@ impl BTreeCursor {
}
let cell_idx = cell_idx as usize;
debug!(
tracing::trace!(
"get_prev_record current id={} cell={}",
page.get().id,
cell_idx
@@ -359,7 +359,7 @@ impl BTreeCursor {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_cell_index() as usize;
debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx);
tracing::trace!("current id={} cell={}", mem_page_rc.get().id, cell_idx);
return_if_locked!(mem_page_rc);
if !mem_page_rc.is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
@@ -846,8 +846,7 @@ impl BTreeCursor {
cell_payload.as_slice(),
cell_idx,
self.usable_space() as u16,
)
.unwrap();
)?;
contents.overflow_cells.len()
};
let write_info = self

View File

@@ -62,37 +62,34 @@ impl DumbLruPageCache {
pub fn insert(&mut self, key: PageCacheKey, value: PageRef) {
self._delete(key.clone(), false);
debug!("cache_insert(key={:?})", key);
let mut entry = Box::new(PageCacheEntry {
let entry = Box::new(PageCacheEntry {
key: key.clone(),
next: None,
prev: None,
page: value,
});
self.touch(&mut entry);
let ptr_raw = Box::into_raw(entry);
let ptr = unsafe { ptr_raw.as_mut().unwrap().as_non_null() };
self.touch(ptr);
if self.map.borrow().len() >= self.capacity {
self.map.borrow_mut().insert(key, ptr);
if self.len() > self.capacity {
self.pop_if_not_dirty();
}
let b = Box::into_raw(entry);
let as_non_null = NonNull::new(b).unwrap();
self.map.borrow_mut().insert(key, as_non_null);
}
pub fn delete(&mut self, key: PageCacheKey) {
debug!("cache_delete(key={:?})", key);
self._delete(key, true)
}
pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) {
debug!("cache_delete(key={:?}, clean={})", key, clean_page);
let ptr = self.map.borrow_mut().remove(&key);
if ptr.is_none() {
return;
}
let mut ptr = ptr.unwrap();
{
let ptr = unsafe { ptr.as_mut() };
self.detach(ptr, clean_page);
}
let ptr = ptr.unwrap();
self.detach(ptr, clean_page);
unsafe { std::ptr::drop_in_place(ptr.as_ptr()) };
}
@@ -103,13 +100,18 @@ impl DumbLruPageCache {
}
pub fn get(&mut self, key: &PageCacheKey) -> Option<PageRef> {
self.peek(key, true)
}
/// Get page without promoting entry
pub fn peek(&mut self, key: &PageCacheKey, touch: bool) -> Option<PageRef> {
debug!("cache_get(key={:?})", key);
let ptr = self.get_ptr(key);
ptr?;
let ptr = unsafe { ptr.unwrap().as_mut() };
let page = ptr.page.clone();
//self.detach(ptr);
self.touch(ptr);
let mut ptr = self.get_ptr(key)?;
let page = unsafe { ptr.as_mut().page.clone() };
if touch {
self.detach(ptr, false);
self.touch(ptr);
}
Some(page)
}
@@ -118,19 +120,17 @@ impl DumbLruPageCache {
todo!();
}
fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) {
let mut current = entry.as_non_null();
fn detach(&mut self, mut entry: NonNull<PageCacheEntry>, clean_page: bool) {
if clean_page {
// evict buffer
let page = &entry.page;
let page = unsafe { &entry.as_mut().page };
page.clear_loaded();
debug!("cleaning up page {}", page.get().id);
let _ = page.get().contents.take();
}
let (next, prev) = unsafe {
let c = current.as_mut();
let c = entry.as_mut();
let next = c.next;
let prev = c.prev;
c.prev = None;
@@ -140,9 +140,16 @@ impl DumbLruPageCache {
// detach
match (prev, next) {
(None, None) => {}
(None, Some(_)) => todo!(),
(Some(p), None) => {
(None, None) => {
self.head.replace(None);
self.tail.replace(None);
}
(None, Some(mut n)) => {
unsafe { n.as_mut().prev = None };
self.head.borrow_mut().replace(n);
}
(Some(mut p), None) => {
unsafe { p.as_mut().next = None };
self.tail = RefCell::new(Some(p));
}
(Some(mut p), Some(mut n)) => unsafe {
@@ -154,19 +161,20 @@ impl DumbLruPageCache {
};
}
fn touch(&mut self, entry: &mut PageCacheEntry) {
let mut current = entry.as_non_null();
unsafe {
let c = current.as_mut();
c.next = *self.head.borrow();
}
/// inserts into head, assuming we detached first
fn touch(&mut self, mut entry: NonNull<PageCacheEntry>) {
if let Some(mut head) = *self.head.borrow_mut() {
unsafe {
entry.as_mut().next.replace(head);
let head = head.as_mut();
head.prev = Some(current);
head.prev = Some(entry);
}
}
if self.tail.borrow().is_none() {
self.tail.borrow_mut().replace(entry);
}
self.head.borrow_mut().replace(entry);
}
fn pop_if_not_dirty(&mut self) {
@@ -174,12 +182,14 @@ impl DumbLruPageCache {
if tail.is_none() {
return;
}
let tail = unsafe { tail.unwrap().as_mut() };
if tail.page.is_dirty() {
let mut tail = tail.unwrap();
let tail_entry = unsafe { tail.as_mut() };
if tail_entry.page.is_dirty() {
// TODO: drop from another clean entry?
return;
}
self.detach(tail, true);
assert!(self.map.borrow_mut().remove(&tail_entry.key).is_some());
}
pub fn clear(&mut self) {
@@ -188,4 +198,148 @@ impl DumbLruPageCache {
self.delete(key);
}
}
pub fn print(&mut self) {
println!("page_cache={}", self.map.borrow().len());
println!("page_cache={:?}", self.map.borrow())
}
pub fn len(&self) -> usize {
self.map.borrow().len()
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroUsize, sync::Arc};
use lru::LruCache;
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
ChaCha8Rng,
};
use crate::{storage::page_cache::DumbLruPageCache, Page};
use super::PageCacheKey;
#[test]
fn test_page_cache_evict() {
let mut cache = DumbLruPageCache::new(1);
let key1 = insert_page(&mut cache, 1);
let key2 = insert_page(&mut cache, 2);
assert_eq!(cache.get(&key2).unwrap().get().id, 2);
assert!(cache.get(&key1).is_none());
}
#[test]
fn test_page_cache_fuzz() {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut rng = ChaCha8Rng::seed_from_u64(seed);
tracing::info!("super seed: {}", seed);
let max_pages = 10;
let mut cache = DumbLruPageCache::new(10);
let mut lru = LruCache::new(NonZeroUsize::new(10).unwrap());
for _ in 0..10000 {
match rng.next_u64() % 3 {
0 => {
// add
let id_page = rng.next_u64() % max_pages;
let id_frame = rng.next_u64() % max_pages;
let key = PageCacheKey::new(id_page as usize, Some(id_frame));
#[allow(clippy::arc_with_non_send_sync)]
let page = Arc::new(Page::new(id_page as usize));
// println!("inserting page {:?}", key);
cache.insert(key.clone(), page.clone());
lru.push(key, page);
assert!(cache.len() <= 10);
}
1 => {
// remove
let random = rng.next_u64() % 2 == 0;
let key = if random || lru.is_empty() {
let id_page = rng.next_u64() % max_pages;
let id_frame = rng.next_u64() % max_pages;
let key = PageCacheKey::new(id_page as usize, Some(id_frame));
key
} else {
let i = rng.next_u64() as usize % lru.len();
let key = lru.iter().skip(i).next().unwrap().0.clone();
key
};
// println!("removing page {:?}", key);
lru.pop(&key);
cache.delete(key);
}
2 => {
// test contents
for (key, page) in &lru {
// println!("getting page {:?}", key);
cache.peek(&key, false).unwrap();
assert_eq!(page.get().id, key.pgno);
}
}
_ => unreachable!(),
}
}
}
#[test]
fn test_page_cache_insert_and_get() {
let mut cache = DumbLruPageCache::new(2);
let key1 = insert_page(&mut cache, 1);
let key2 = insert_page(&mut cache, 2);
assert_eq!(cache.get(&key1).unwrap().get().id, 1);
assert_eq!(cache.get(&key2).unwrap().get().id, 2);
}
#[test]
fn test_page_cache_over_capacity() {
let mut cache = DumbLruPageCache::new(2);
let key1 = insert_page(&mut cache, 1);
let key2 = insert_page(&mut cache, 2);
let key3 = insert_page(&mut cache, 3);
assert!(cache.get(&key1).is_none());
assert_eq!(cache.get(&key2).unwrap().get().id, 2);
assert_eq!(cache.get(&key3).unwrap().get().id, 3);
}
#[test]
fn test_page_cache_delete() {
let mut cache = DumbLruPageCache::new(2);
let key1 = insert_page(&mut cache, 1);
cache.delete(key1.clone());
assert!(cache.get(&key1).is_none());
}
#[test]
fn test_page_cache_clear() {
let mut cache = DumbLruPageCache::new(2);
let key1 = insert_page(&mut cache, 1);
let key2 = insert_page(&mut cache, 2);
cache.clear();
assert!(cache.get(&key1).is_none());
assert!(cache.get(&key2).is_none());
}
fn insert_page(cache: &mut DumbLruPageCache, id: usize) -> PageCacheKey {
let key = PageCacheKey::new(id, None);
#[allow(clippy::arc_with_non_send_sync)]
let page = Arc::new(Page::new(id));
cache.insert(key.clone(), page.clone());
key
}
#[test]
fn test_page_cache_insert_sequential() {
let mut cache = DumbLruPageCache::new(2);
for i in 0..10000 {
let key = insert_page(&mut cache, i);
assert_eq!(cache.peek(&key, false).unwrap().get().id, i);
}
}
}

View File

@@ -122,7 +122,7 @@ impl Page {
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
enum FlushState {
Start,
WaitAppendFrames,
@@ -247,6 +247,7 @@ impl Pager {
match checkpoint_status {
CheckpointStatus::IO => Ok(checkpoint_status),
CheckpointStatus::Done(_) => {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
Ok(checkpoint_status)
}
@@ -260,11 +261,11 @@ impl Pager {
/// Reads a page from the database.
pub fn read_page(&self, page_idx: usize) -> Result<PageRef> {
trace!("read_page(page_idx = {})", page_idx);
tracing::debug!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame()));
if let Some(page) = page_cache.get(&page_key) {
trace!("read_page(page_idx = {}) = cached", page_idx);
tracing::debug!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
}
let page = Arc::new(Page::new(page_idx));
@@ -347,6 +348,7 @@ impl Pager {
let mut checkpoint_result = CheckpointResult::new();
loop {
let state = self.flush_info.borrow().state.clone();
trace!("cacheflush {:?}", state);
match state {
FlushState::Start => {
let db_size = self.db_header.lock().unwrap().database_size;
@@ -362,6 +364,10 @@ impl Pager {
db_size,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
// This page is no longer valid.
// For example:
// We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated.
cache.delete(page_key);
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;

View File

@@ -454,25 +454,25 @@ impl PageContent {
}
pub fn write_u8(&self, pos: usize, value: u8) {
tracing::debug!("write_u8(pos={}, value={})", pos, value);
tracing::trace!("write_u8(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos] = value;
}
pub fn write_u16(&self, pos: usize, value: u16) {
tracing::debug!("write_u16(pos={}, value={})", pos, value);
tracing::trace!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes());
}
pub fn write_u16_no_offset(&self, pos: usize, value: u16) {
tracing::debug!("write_u16(pos={}, value={})", pos, value);
tracing::trace!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[pos..pos + 2].copy_from_slice(&value.to_be_bytes());
}
pub fn write_u32(&self, pos: usize, value: u32) {
tracing::debug!("write_u32(pos={}, value={})", pos, value);
tracing::trace!("write_u32(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
}
@@ -562,7 +562,7 @@ impl PageContent {
payload_overflow_threshold_min: usize,
usable_size: usize,
) -> Result<BTreeCell> {
tracing::debug!("cell_get(idx={})", idx);
tracing::trace!("cell_get(idx={})", idx);
let buf = self.as_ptr();
let ncells = self.cell_count();
@@ -1308,7 +1308,7 @@ pub fn begin_read_wal_frame(
let frame = page.clone();
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let frame = frame.clone();
finish_read_page(2, buf, frame).unwrap();
finish_read_page(page.get().id, buf, frame).unwrap();
});
let c = Completion::Read(ReadCompletion::new(buf, complete));
io.pread(offset, c)?;

View File

@@ -176,6 +176,7 @@ pub trait Wal {
mode: CheckpointMode,
) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<CheckpointStatus>;
fn get_max_frame_in_wal(&self) -> u64;
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
}
@@ -333,8 +334,8 @@ impl Wal for WalFile {
}
}
// If we didn't find any mark, then let's add a new one
if max_read_mark_index == -1 {
// If we didn't find any mark or we can update, let's update them
if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 {
for (index, lock) in shared.read_locks.iter_mut().enumerate() {
let busy = !lock.write();
if !busy {
@@ -361,10 +362,11 @@ impl Wal for WalFile {
self.max_frame = max_read_mark as u64;
self.min_frame = shared.nbackfills + 1;
tracing::debug!(
"begin_read_tx(min_frame={}, max_frame={}, lock={})",
"begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})",
self.min_frame,
self.max_frame,
self.max_frame_read_lock_index
self.max_frame_read_lock_index,
max_frame_in_wal
);
Ok(LimboResult::Ok)
}
@@ -500,14 +502,18 @@ impl Wal for WalFile {
// TODO(pere): check what frames are safe to checkpoint between many readers!
self.ongoing_checkpoint.min_frame = self.min_frame;
let mut shared = self.shared.write();
let max_frame_in_wal = shared.max_frame as u32;
let mut max_safe_frame = shared.max_frame;
for read_lock in shared.read_locks.iter_mut() {
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() {
let this_mark = read_lock.value.load(Ordering::SeqCst);
if this_mark < max_safe_frame as u32 {
let busy = !read_lock.write();
if !busy {
read_lock.value.store(max_frame_in_wal, Ordering::SeqCst);
let new_mark = if read_lock_idx == 0 {
max_safe_frame as u32
} else {
READMARK_NOT_USED
};
read_lock.value.store(new_mark, Ordering::SeqCst);
read_lock.unlock();
} else {
max_safe_frame = this_mark as u64;
@@ -613,6 +619,7 @@ impl Wal for WalFile {
shared.pages_in_frames.clear();
shared.max_frame = 0;
shared.nbackfills = 0;
// TODO(pere): truncate wal file here.
} else {
shared.nbackfills = self.ongoing_checkpoint.max_frame;
}
@@ -658,6 +665,10 @@ impl Wal for WalFile {
}
}
fn get_max_frame_in_wal(&self) -> u64 {
self.shared.read().max_frame
}
fn get_max_frame(&self) -> u64 {
self.max_frame
}

View File

@@ -3200,6 +3200,7 @@ impl Program {
connection.deref(),
),
TransactionState::Read => {
connection.transaction_state.replace(TransactionState::None);
pager.end_read_tx()?;
Ok(StepResult::Done)
}
@@ -3226,19 +3227,20 @@ impl Program {
let checkpoint_status = pager.end_tx()?;
match checkpoint_status {
CheckpointStatus::Done(_) => {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
connection.transaction_state.replace(TransactionState::None);
let _ = halt_state.take();
}
CheckpointStatus::IO => {
tracing::trace!("Checkpointing IO");
*halt_state = Some(HaltState::Checkpointing);
return Ok(StepResult::IO);
}
}
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
Ok(StepResult::Done)
}
}

View File

@@ -122,7 +122,10 @@ pub unsafe extern "C" fn sqlite3_open(
*db_out = Box::leak(Box::new(sqlite3::new(db, conn)));
SQLITE_OK
}
Err(_e) => SQLITE_CANTOPEN,
Err(e) => {
log::error!("error opening database {:?}", e);
SQLITE_CANTOPEN
}
}
}
@@ -1125,7 +1128,7 @@ mod tests {
unsafe {
let mut db = ptr::null_mut();
assert_eq!(
sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db),
sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db),
SQLITE_OK
);
assert_eq!(sqlite3_close(db), SQLITE_OK);
@@ -1144,7 +1147,7 @@ mod tests {
unsafe {
let mut db = ptr::null_mut();
assert_eq!(
sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db),
sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db),
SQLITE_OK
);
@@ -1177,7 +1180,7 @@ mod tests {
// Test with valid db
let mut db = ptr::null_mut();
assert_eq!(
sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db),
sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db),
SQLITE_OK
);
assert_eq!(sqlite3_wal_checkpoint(db, ptr::null()), SQLITE_OK);
@@ -1203,7 +1206,7 @@ mod tests {
// Test with valid db
let mut db = ptr::null_mut();
assert_eq!(
sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db),
sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db),
SQLITE_OK
);

View File

@@ -154,7 +154,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> {
}
#[ignore]
#[test]
#[test_log::test]
fn test_sequential_write() -> anyhow::Result<()> {
let _ = env_logger::try_init();
@@ -164,7 +164,7 @@ fn test_sequential_write() -> anyhow::Result<()> {
let list_query = "SELECT * FROM test";
let max_iterations = 10000;
for i in 0..max_iterations {
debug!("inserting {} ", i);
println!("inserting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);