From be3badc1f301025e59812352675d92a5fb926e78 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 5 Mar 2025 22:52:44 +0100 Subject: [PATCH 01/12] modify a few btree log level and add end_write_txn after checkpoint --- core/storage/btree.rs | 4 ++-- core/storage/pager.rs | 1 + core/storage/sqlite3_ondisk.rs | 10 +++++----- core/vdbe/mod.rs | 1 + tests/integration/query_processing/test_write_path.rs | 4 ++-- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0069a8f76..4b1fe68c4 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -284,7 +284,7 @@ impl BTreeCursor { } let cell_idx = cell_idx as usize; - debug!( + tracing::trace!( "get_prev_record current id={} cell={}", page.get().id, cell_idx @@ -359,7 +359,7 @@ impl BTreeCursor { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_cell_index() as usize; - debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx); + tracing::trace!("current id={} cell={}", mem_page_rc.get().id, cell_idx); return_if_locked!(mem_page_rc); if !mem_page_rc.is_loaded() { self.pager.load_page(mem_page_rc.clone())?; diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 54ef933de..eb4255a3b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -247,6 +247,7 @@ impl Pager { match checkpoint_status { CheckpointStatus::IO => Ok(checkpoint_status), CheckpointStatus::Done(_) => { + self.wal.borrow().end_write_tx()?; self.wal.borrow().end_read_tx()?; Ok(checkpoint_status) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index a55d180f4..066742417 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -454,25 +454,25 @@ impl PageContent { } pub fn write_u8(&self, pos: usize, value: u8) { - tracing::debug!("write_u8(pos={}, value={})", pos, value); + tracing::trace!("write_u8(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[self.offset + pos] = value; } pub fn write_u16(&self, pos: usize, value: u16) { - tracing::debug!("write_u16(pos={}, value={})", pos, value); + tracing::trace!("write_u16(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes()); } pub fn write_u16_no_offset(&self, pos: usize, value: u16) { - tracing::debug!("write_u16(pos={}, value={})", pos, value); + tracing::trace!("write_u16(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[pos..pos + 2].copy_from_slice(&value.to_be_bytes()); } pub fn write_u32(&self, pos: usize, value: u32) { - tracing::debug!("write_u32(pos={}, value={})", pos, value); + tracing::trace!("write_u32(pos={}, value={})", pos, value); let buf = self.as_ptr(); buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes()); } @@ -562,7 +562,7 @@ impl PageContent { payload_overflow_threshold_min: usize, usable_size: usize, ) -> Result { - tracing::debug!("cell_get(idx={})", idx); + tracing::trace!("cell_get(idx={})", idx); let buf = self.as_ptr(); let ncells = self.cell_count(); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index b65bd80d5..b4fc94d46 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -3200,6 +3200,7 @@ impl Program { connection.deref(), ), TransactionState::Read => { + connection.transaction_state.replace(TransactionState::None); pager.end_read_tx()?; Ok(StepResult::Done) } diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 81e47de27..dd313cfea 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -154,7 +154,7 @@ fn test_sequential_overflow_page() -> anyhow::Result<()> { } #[ignore] -#[test] +#[test_log::test] fn test_sequential_write() -> anyhow::Result<()> { let _ = env_logger::try_init(); @@ -164,7 +164,7 @@ fn test_sequential_write() -> anyhow::Result<()> { let list_query = "SELECT * FROM test"; let max_iterations = 10000; for i in 0..max_iterations { - debug!("inserting {} ", i); + println!("inserting {} ", i); if (i % 100) == 0 { let progress = (i as f64 / max_iterations as f64) * 100.0; println!("progress {:.1}%", progress); From deaff6c1ec8cb8847b5762aa2d5ad733c2c77b1c Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 15:39:03 +0100 Subject: [PATCH 02/12] Fix detachment of nodes in lru cache. --- Cargo.lock | 18 +++ core/Cargo.toml | 1 + core/storage/page_cache.rs | 226 +++++++++++++++++++++++++++++++------ 3 files changed, 209 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c12346524..6efd9e93a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,12 @@ dependencies = [ "equator", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anarchist-readable-name-generator-lib" version = "0.1.2" @@ -1083,6 +1089,8 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", ] @@ -1658,6 +1666,7 @@ dependencies = [ "limbo_sqlite3_parser", "limbo_time", "limbo_uuid", + "lru", "miette", "mimalloc", "parking_lot", @@ -1887,6 +1896,15 @@ version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +[[package]] +name = "lru" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465" +dependencies = [ + "hashbrown", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 0914142a1..7be562867 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -97,6 +97,7 @@ rand = "0.8.5" # Required for quickcheck rand_chacha = "0.9.0" env_logger = "0.11.6" test-log = { version = "0.2.17", features = ["trace"] } +lru = "0.13.0" [[bench]] name = "benchmark" diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 2ead4a58b..187786f6e 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -62,37 +62,34 @@ impl DumbLruPageCache { pub fn insert(&mut self, key: PageCacheKey, value: PageRef) { self._delete(key.clone(), false); debug!("cache_insert(key={:?})", key); - let mut entry = Box::new(PageCacheEntry { + let entry = Box::new(PageCacheEntry { key: key.clone(), next: None, prev: None, page: value, }); - self.touch(&mut entry); + let ptr_raw = Box::into_raw(entry); + let ptr = unsafe { ptr_raw.as_mut().unwrap().as_non_null() }; + self.touch(ptr); - if self.map.borrow().len() >= self.capacity { + self.map.borrow_mut().insert(key, ptr); + if self.len() > self.capacity { self.pop_if_not_dirty(); } - let b = Box::into_raw(entry); - let as_non_null = NonNull::new(b).unwrap(); - self.map.borrow_mut().insert(key, as_non_null); } pub fn delete(&mut self, key: PageCacheKey) { + debug!("cache_delete(key={:?})", key); self._delete(key, true) } pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) { - debug!("cache_delete(key={:?}, clean={})", key, clean_page); let ptr = self.map.borrow_mut().remove(&key); if ptr.is_none() { return; } - let mut ptr = ptr.unwrap(); - { - let ptr = unsafe { ptr.as_mut() }; - self.detach(ptr, clean_page); - } + let ptr = ptr.unwrap(); + self.detach(ptr, clean_page); unsafe { std::ptr::drop_in_place(ptr.as_ptr()) }; } @@ -103,13 +100,18 @@ impl DumbLruPageCache { } pub fn get(&mut self, key: &PageCacheKey) -> Option { + self.peek(key, true) + } + + /// Get page without promoting entry + pub fn peek(&mut self, key: &PageCacheKey, touch: bool) -> Option { debug!("cache_get(key={:?})", key); - let ptr = self.get_ptr(key); - ptr?; - let ptr = unsafe { ptr.unwrap().as_mut() }; - let page = ptr.page.clone(); - //self.detach(ptr); - self.touch(ptr); + let mut ptr = self.get_ptr(key)?; + let page = unsafe { ptr.as_mut().page.clone() }; + if touch { + self.detach(ptr, false); + self.touch(ptr); + } Some(page) } @@ -118,19 +120,17 @@ impl DumbLruPageCache { todo!(); } - fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) { - let mut current = entry.as_non_null(); - + fn detach(&mut self, mut entry: NonNull, clean_page: bool) { if clean_page { // evict buffer - let page = &entry.page; + let page = unsafe { &entry.as_mut().page }; page.clear_loaded(); debug!("cleaning up page {}", page.get().id); let _ = page.get().contents.take(); } let (next, prev) = unsafe { - let c = current.as_mut(); + let c = entry.as_mut(); let next = c.next; let prev = c.prev; c.prev = None; @@ -140,9 +140,16 @@ impl DumbLruPageCache { // detach match (prev, next) { - (None, None) => {} - (None, Some(_)) => todo!(), - (Some(p), None) => { + (None, None) => { + self.head.replace(None); + self.tail.replace(None); + } + (None, Some(mut n)) => { + unsafe { n.as_mut().prev = None }; + self.head.borrow_mut().replace(n); + } + (Some(mut p), None) => { + unsafe { p.as_mut().next = None }; self.tail = RefCell::new(Some(p)); } (Some(mut p), Some(mut n)) => unsafe { @@ -154,19 +161,20 @@ impl DumbLruPageCache { }; } - fn touch(&mut self, entry: &mut PageCacheEntry) { - let mut current = entry.as_non_null(); - unsafe { - let c = current.as_mut(); - c.next = *self.head.borrow(); - } - + /// inserts into head, assuming we detached first + fn touch(&mut self, mut entry: NonNull) { if let Some(mut head) = *self.head.borrow_mut() { unsafe { + entry.as_mut().next.replace(head); let head = head.as_mut(); - head.prev = Some(current); + head.prev = Some(entry); } } + + if self.tail.borrow().is_none() { + self.tail.borrow_mut().replace(entry); + } + self.head.borrow_mut().replace(entry); } fn pop_if_not_dirty(&mut self) { @@ -174,12 +182,14 @@ impl DumbLruPageCache { if tail.is_none() { return; } - let tail = unsafe { tail.unwrap().as_mut() }; - if tail.page.is_dirty() { + let mut tail = tail.unwrap(); + let tail_entry = unsafe { tail.as_mut() }; + if tail_entry.page.is_dirty() { // TODO: drop from another clean entry? return; } self.detach(tail, true); + assert!(self.map.borrow_mut().remove(&tail_entry.key).is_some()); } pub fn clear(&mut self) { @@ -188,4 +198,148 @@ impl DumbLruPageCache { self.delete(key); } } + + pub fn print(&mut self) { + println!("page_cache={}", self.map.borrow().len()); + println!("page_cache={:?}", self.map.borrow()) + } + + pub fn len(&self) -> usize { + self.map.borrow().len() + } +} + +#[cfg(test)] +mod tests { + use std::{num::NonZeroUsize, sync::Arc}; + + use lru::LruCache; + use rand_chacha::{ + rand_core::{RngCore, SeedableRng}, + ChaCha8Rng, + }; + + use crate::{storage::page_cache::DumbLruPageCache, Page}; + + use super::PageCacheKey; + + #[test] + fn test_page_cache_evict() { + let mut cache = DumbLruPageCache::new(1); + let key1 = insert_page(&mut cache, 1); + let key2 = insert_page(&mut cache, 2); + assert_eq!(cache.get(&key2).unwrap().get().id, 2); + assert!(cache.get(&key1).is_none()); + } + + #[test] + fn test_page_cache_fuzz() { + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + tracing::info!("super seed: {}", seed); + let max_pages = 10; + let mut cache = DumbLruPageCache::new(10); + let mut lru = LruCache::new(NonZeroUsize::new(10).unwrap()); + + for _ in 0..10000 { + match rng.next_u64() % 3 { + 0 => { + // add + let id_page = rng.next_u64() % max_pages; + let id_frame = rng.next_u64() % max_pages; + let key = PageCacheKey::new(id_page as usize, Some(id_frame)); + let page = Arc::new(Page::new(id_page as usize)); + // println!("inserting page {:?}", key); + cache.insert(key.clone(), page.clone()); + lru.push(key, page); + assert!(cache.len() <= 10); + } + 1 => { + // remove + let random = rng.next_u64() % 1 == 0; + let key = if random { + let id_page = rng.next_u64() % max_pages; + let id_frame = rng.next_u64() % max_pages; + let key = PageCacheKey::new(id_page as usize, Some(id_frame)); + key + } else { + let i = rng.next_u64() as usize % lru.len(); + let key = lru.iter().skip(i).next().unwrap().0.clone(); + key + }; + // println!("removing page {:?}", key); + lru.pop(&key); + cache.delete(key); + } + 2 => { + // cache.print(); + // println!("lru={:?}", lru); + // test contents + for (key, page) in &lru { + // println!("getting page {:?}", key); + cache.peek(&key, false).unwrap(); + assert_eq!(page.get().id, key.pgno); + } + } + _ => unreachable!(), + } + } + } + + #[test] + fn test_page_cache_insert_and_get() { + let mut cache = DumbLruPageCache::new(2); + let key1 = insert_page(&mut cache, 1); + let key2 = insert_page(&mut cache, 2); + assert_eq!(cache.get(&key1).unwrap().get().id, 1); + assert_eq!(cache.get(&key2).unwrap().get().id, 2); + } + + #[test] + fn test_page_cache_over_capacity() { + let mut cache = DumbLruPageCache::new(2); + let key1 = insert_page(&mut cache, 1); + let key2 = insert_page(&mut cache, 2); + let key3 = insert_page(&mut cache, 3); + assert!(cache.get(&key1).is_none()); + assert_eq!(cache.get(&key2).unwrap().get().id, 2); + assert_eq!(cache.get(&key3).unwrap().get().id, 3); + } + + #[test] + fn test_page_cache_delete() { + let mut cache = DumbLruPageCache::new(2); + let key1 = insert_page(&mut cache, 1); + cache.delete(key1.clone()); + assert!(cache.get(&key1).is_none()); + } + + #[test] + fn test_page_cache_clear() { + let mut cache = DumbLruPageCache::new(2); + let key1 = insert_page(&mut cache, 1); + let key2 = insert_page(&mut cache, 2); + cache.clear(); + assert!(cache.get(&key1).is_none()); + assert!(cache.get(&key2).is_none()); + } + + fn insert_page(cache: &mut DumbLruPageCache, id: usize) -> PageCacheKey { + let key = PageCacheKey::new(id, None); + let page = Arc::new(Page::new(id)); + cache.insert(key.clone(), page.clone()); + key + } + + #[test] + fn test_page_cache_insert_sequential() { + let mut cache = DumbLruPageCache::new(2); + for i in 0..10000 { + let key = insert_page(&mut cache, i); + // assert_eq!(cache.peek(&key, false).unwrap().get().id, i); + } + } } From 825907bfacb8608218c45f9df7aa6e08720ae585 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 15:40:37 +0100 Subject: [PATCH 03/12] Invalidate cache entry after checkpoint of frame completes --- core/storage/btree.rs | 3 +-- core/storage/pager.rs | 11 ++++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4b1fe68c4..4ae59cd74 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -846,8 +846,7 @@ impl BTreeCursor { cell_payload.as_slice(), cell_idx, self.usable_space() as u16, - ) - .unwrap(); + )?; contents.overflow_cells.len() }; let write_info = self diff --git a/core/storage/pager.rs b/core/storage/pager.rs index eb4255a3b..5c43eee1a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -122,7 +122,7 @@ impl Page { } } -#[derive(Clone)] +#[derive(Clone, Debug)] enum FlushState { Start, WaitAppendFrames, @@ -261,11 +261,11 @@ impl Pager { /// Reads a page from the database. pub fn read_page(&self, page_idx: usize) -> Result { - trace!("read_page(page_idx = {})", page_idx); + tracing::debug!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame())); if let Some(page) = page_cache.get(&page_key) { - trace!("read_page(page_idx = {}) = cached", page_idx); + tracing::debug!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); } let page = Arc::new(Page::new(page_idx)); @@ -348,6 +348,7 @@ impl Pager { let mut checkpoint_result = CheckpointResult::new(); loop { let state = self.flush_info.borrow().state.clone(); + trace!("cacheflush {:?}", state); match state { FlushState::Start => { let db_size = self.db_header.lock().unwrap().database_size; @@ -363,6 +364,10 @@ impl Pager { db_size, self.flush_info.borrow().in_flight_writes.clone(), )?; + // This page is no longer valid. + // For example: + // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated. + cache.delete(page_key); } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; From f9916e8149e76183a9d664f162bc52aeda7d3f46 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 15:41:17 +0100 Subject: [PATCH 04/12] update max frame in case we got a read lock with outdated max frame --- core/storage/wal.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e9e610253..d6442b2bf 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -176,6 +176,7 @@ pub trait Wal { mode: CheckpointMode, ) -> Result; fn sync(&mut self) -> Result; + fn get_max_frame_in_wal(&self) -> u64; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; } @@ -333,8 +334,8 @@ impl Wal for WalFile { } } - // If we didn't find any mark, then let's add a new one - if max_read_mark_index == -1 { + // If we didn't find any mark or we can update, let's update them + if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 { for (index, lock) in shared.read_locks.iter_mut().enumerate() { let busy = !lock.write(); if !busy { @@ -361,10 +362,11 @@ impl Wal for WalFile { self.max_frame = max_read_mark as u64; self.min_frame = shared.nbackfills + 1; tracing::debug!( - "begin_read_tx(min_frame={}, max_frame={}, lock={})", + "begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})", self.min_frame, self.max_frame, - self.max_frame_read_lock_index + self.max_frame_read_lock_index, + max_frame_in_wal ); Ok(LimboResult::Ok) } @@ -500,14 +502,18 @@ impl Wal for WalFile { // TODO(pere): check what frames are safe to checkpoint between many readers! self.ongoing_checkpoint.min_frame = self.min_frame; let mut shared = self.shared.write(); - let max_frame_in_wal = shared.max_frame as u32; let mut max_safe_frame = shared.max_frame; - for read_lock in shared.read_locks.iter_mut() { + for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() { let this_mark = read_lock.value.load(Ordering::SeqCst); if this_mark < max_safe_frame as u32 { let busy = !read_lock.write(); if !busy { - read_lock.value.store(max_frame_in_wal, Ordering::SeqCst); + let new_mark = if read_lock_idx == 0 { + max_safe_frame as u32 + } else { + READMARK_NOT_USED + }; + read_lock.value.store(new_mark, Ordering::SeqCst); read_lock.unlock(); } else { max_safe_frame = this_mark as u64; @@ -613,6 +619,7 @@ impl Wal for WalFile { shared.pages_in_frames.clear(); shared.max_frame = 0; shared.nbackfills = 0; + // TODO(pere): truncate wal file here. } else { shared.nbackfills = self.ongoing_checkpoint.max_frame; } @@ -658,6 +665,10 @@ impl Wal for WalFile { } } + fn get_max_frame_in_wal(&self) -> u64 { + self.shared.read().max_frame + } + fn get_max_frame(&self) -> u64 { self.max_frame } From cc320a74ca914ebdefd1cc9de207da1781d2848b Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 15:41:28 +0100 Subject: [PATCH 05/12] few checkpoint result cleanup in vdbe --- core/vdbe/mod.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index b4fc94d46..d5c30f9f0 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -3227,19 +3227,20 @@ impl Program { let checkpoint_status = pager.end_tx()?; match checkpoint_status { CheckpointStatus::Done(_) => { + if self.change_cnt_on { + if let Some(conn) = self.connection.upgrade() { + conn.set_changes(self.n_change.get()); + } + } connection.transaction_state.replace(TransactionState::None); let _ = halt_state.take(); } CheckpointStatus::IO => { + tracing::trace!("Checkpointing IO"); *halt_state = Some(HaltState::Checkpointing); return Ok(StepResult::IO); } } - if self.change_cnt_on { - if let Some(conn) = self.connection.upgrade() { - conn.set_changes(self.n_change.get()); - } - } Ok(StepResult::Done) } } From 8127927775b767e5f2d493c878f01a84dd964733 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 15:44:13 +0100 Subject: [PATCH 06/12] remove modulo by 1 error --- core/storage/page_cache.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 187786f6e..b48cecf5f 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -259,8 +259,8 @@ mod tests { } 1 => { // remove - let random = rng.next_u64() % 1 == 0; - let key = if random { + let random = rng.next_u64() % 2 == 0; + let key = if random || lru.is_empty() { let id_page = rng.next_u64() % max_pages; let id_frame = rng.next_u64() % max_pages; let key = PageCacheKey::new(id_page as usize, Some(id_frame)); @@ -275,8 +275,6 @@ mod tests { cache.delete(key); } 2 => { - // cache.print(); - // println!("lru={:?}", lru); // test contents for (key, page) in &lru { // println!("getting page {:?}", key); @@ -339,7 +337,7 @@ mod tests { let mut cache = DumbLruPageCache::new(2); for i in 0..10000 { let key = insert_page(&mut cache, i); - // assert_eq!(cache.peek(&key, false).unwrap().get().id, i); + assert_eq!(cache.peek(&key, false).unwrap().get().id, i); } } } From 40a78c32b6a8f2def490c6c23acb8de5d33cee89 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 16:00:46 +0100 Subject: [PATCH 07/12] fix sqlite3 lib db test path --- sqlite3/src/lib.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 097292708..78cc831b9 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -122,7 +122,10 @@ pub unsafe extern "C" fn sqlite3_open( *db_out = Box::leak(Box::new(sqlite3::new(db, conn))); SQLITE_OK } - Err(_e) => SQLITE_CANTOPEN, + Err(e) => { + log::error!("error opening database {:?}", e); + SQLITE_CANTOPEN + }, } } @@ -1144,7 +1147,7 @@ mod tests { unsafe { let mut db = ptr::null_mut(); assert_eq!( - sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db), + sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db), SQLITE_OK ); @@ -1177,7 +1180,7 @@ mod tests { // Test with valid db let mut db = ptr::null_mut(); assert_eq!( - sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db), + sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db), SQLITE_OK ); assert_eq!(sqlite3_wal_checkpoint(db, ptr::null()), SQLITE_OK); @@ -1203,7 +1206,7 @@ mod tests { // Test with valid db let mut db = ptr::null_mut(); assert_eq!( - sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db), + sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db), SQLITE_OK ); From 1af6dccc725c9ef908040a72273d212a4ac09f5a Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 16:02:04 +0100 Subject: [PATCH 08/12] allow arc in tests --- core/storage/page_cache.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index b48cecf5f..2004c511a 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -251,6 +251,7 @@ mod tests { let id_page = rng.next_u64() % max_pages; let id_frame = rng.next_u64() % max_pages; let key = PageCacheKey::new(id_page as usize, Some(id_frame)); + #[allow(clippy::arc_with_non_send_sync)] let page = Arc::new(Page::new(id_page as usize)); // println!("inserting page {:?}", key); cache.insert(key.clone(), page.clone()); @@ -327,6 +328,7 @@ mod tests { fn insert_page(cache: &mut DumbLruPageCache, id: usize) -> PageCacheKey { let key = PageCacheKey::new(id, None); + #[allow(clippy::arc_with_non_send_sync)] let page = Arc::new(Page::new(id)); cache.insert(key.clone(), page.clone()); key From e65d76f51fe0a0ece20ccc0a3fcaf1558fd68dbf Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 16:06:29 +0100 Subject: [PATCH 09/12] fmt --- sqlite3/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 78cc831b9..a40dd829d 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -125,7 +125,7 @@ pub unsafe extern "C" fn sqlite3_open( Err(e) => { log::error!("error opening database {:?}", e); SQLITE_CANTOPEN - }, + } } } From f7c8d4cc69161c993407331f6c7f7827d9ae8d8f Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 16:06:52 +0100 Subject: [PATCH 10/12] test_open_existing fix --- sqlite3/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index a40dd829d..2290464bf 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1128,7 +1128,7 @@ mod tests { unsafe { let mut db = ptr::null_mut(); assert_eq!( - sqlite3_open(b"../../testing/testing.db\0".as_ptr() as *const i8, &mut db), + sqlite3_open(b"../testing/testing.db\0".as_ptr() as *const i8, &mut db), SQLITE_OK ); assert_eq!(sqlite3_close(db), SQLITE_OK); From 2fd790a05526cdd8835fa064533ed4a9cab62467 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 16:26:29 +0100 Subject: [PATCH 11/12] make execute command loop until done --- core/lib.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/lib.rs b/core/lib.rs index 64c86f33a..c82902a21 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -387,6 +387,8 @@ impl Connection { QueryRunner::new(self, sql) } + /// Execute will run a query from start to finish taking ownership of I/O because it will run pending I/Os if it didn't finish. + /// TODO: make this api async pub fn execute(self: &Rc, sql: impl AsRef) -> Result<()> { let sql = sql.as_ref(); let mut parser = Parser::new(sql.as_bytes()); @@ -428,7 +430,17 @@ impl Connection { let mut state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); - program.step(&mut state, self._db.mv_store.clone(), self.pager.clone())?; + loop { + let res = program.step( + &mut state, + self._db.mv_store.clone(), + self.pager.clone(), + )?; + if matches!(res, StepResult::Done) { + break; + } + self._db.io.run_once()?; + } } } } From aa4703c442538c54bcb8ab3b9bd205ace033c8ac Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 17:24:59 +0100 Subject: [PATCH 12/12] Fix read frame setting wrong offset When I added frame reading support I thought, okay, who cares about the page id of this page it we read it from a frame because we don't need it to compute the offset to read from the file in this case. Fuck me, because it was needed in case we read `page 1` from WAL because it has a differnt `offset`. --- core/storage/sqlite3_ondisk.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 066742417..d3c7ff431 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1308,7 +1308,7 @@ pub fn begin_read_wal_frame( let frame = page.clone(); let complete = Box::new(move |buf: Arc>| { let frame = frame.clone(); - finish_read_page(2, buf, frame).unwrap(); + finish_read_page(page.get().id, buf, frame).unwrap(); }); let c = Completion::Read(ReadCompletion::new(buf, complete)); io.pread(offset, c)?;