diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 01663ffb4..01c38396d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -217,12 +217,14 @@ impl Page { pub fn set_dirty(&self) { tracing::debug!("set_dirty(page={})", self.get().id); + self.clear_wal_tag(); self.get().flags.fetch_or(PAGE_DIRTY, Ordering::Release); } pub fn clear_dirty(&self) { tracing::debug!("clear_dirty(page={})", self.get().id); self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::Release); + self.clear_wal_tag(); } pub fn is_loaded(&self) -> bool { @@ -308,7 +310,7 @@ impl Page { #[inline] pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool { let (f, s) = self.wal_tag_pair(); - f == target_frame && s == seq && !self.is_dirty() + f == target_frame && s == seq && !self.is_dirty() && self.is_loaded() && !self.is_locked() } } @@ -1196,12 +1198,9 @@ impl Pager { let page_key = PageCacheKey::new(page_idx); let page = page_cache.get(&page_key)?.and_then(|page| { if page.is_valid_for_checkpoint(target_frame, seq) { - tracing::trace!( - "cache_get_for_checkpoint: page {} frame {} is valid", - page_idx, - target_frame + tracing::debug!( + "cache_get_for_checkpoint: page {page_idx} frame {target_frame} is valid", ); - page.pin(); Some(page.clone()) } else { tracing::trace!( @@ -2110,6 +2109,7 @@ impl Pager { )) })?; page.set_loaded(); + page.clear_wal_tag(); Ok(()) } @@ -2203,6 +2203,7 @@ pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc, offset: let buffer = buffer_pool.get_page(); let buffer = Arc::new(buffer); page.set_loaded(); + page.clear_wal_tag(); page.get().contents = Some(PageContent::new(offset, buffer)); } page diff --git a/core/storage/wal.rs b/core/storage/wal.rs index ac9eff3e1..28e55bfbd 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -449,9 +449,8 @@ struct OngoingCheckpoint { inflight_reads: Vec, /// Array of atomic counters representing write operations that are currently in flight. inflight_writes: Vec>, - /// List of all page_id + frame_id combinations to be backfilled, with a boolean - /// to denote that a cached page was used - pages_to_checkpoint: Vec<(u64, u64, bool)>, + /// List of all page_id + frame_id combinations to be backfilled + pages_to_checkpoint: Vec<(u64, u64)>, } impl OngoingCheckpoint { @@ -1073,10 +1072,11 @@ impl Wal for WalFile { page.set_locked(); let frame = page.clone(); let page_idx = page.get().id; - let seq = self.header.checkpoint_seq; + let header = self.get_shared().wal_header.clone(); let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { let Ok((buf, bytes_read)) = res else { page.clear_locked(); + page.clear_wal_tag(); return; }; let buf_len = buf.len(); @@ -1086,6 +1086,7 @@ impl Wal for WalFile { ); let cloned = frame.clone(); finish_read_page(page.get().id, buf, cloned); + let seq = header.lock().checkpoint_seq; frame.set_wal_tag(frame_id, seq); }); let shared = self.get_shared(); @@ -1295,8 +1296,8 @@ impl Wal for WalFile { tracing::debug!(frame_id, offset, page_id); let (c, checksums) = { let shared = self.get_shared(); - let header = shared.wal_header.clone(); - let header = header.lock(); + let shared_header = shared.wal_header.clone(); + let header = shared.wal_header.lock(); let checksums = self.last_checksum; let page_content = page.get_contents(); let page_buf = page_content.as_ptr(); @@ -1316,7 +1317,6 @@ impl Wal for WalFile { page_buf }; - let seq = header.checkpoint_seq; let (frame_checksums, frame_bytes) = prepare_wal_frame( &self.buffer_pool, &header, @@ -1340,6 +1340,7 @@ impl Wal for WalFile { ); page.clear_dirty(); + let seq = shared_header.lock().checkpoint_seq; page.set_wal_tag(frame_id, seq); } }); @@ -1805,7 +1806,7 @@ impl WalFile { f >= self.ongoing_checkpoint.min_frame && f <= self.ongoing_checkpoint.max_frame }) { - list.push((page_id, frame, false)); + list.push((page_id, frame)); } } // sort by frame_id for read locality @@ -1845,40 +1846,56 @@ impl WalFile { let seq = self.header.checkpoint_seq; // Issue reads until we hit limits while self.ongoing_checkpoint.should_issue_reads() { - let (page_id, target_frame, _) = - self.ongoing_checkpoint.pages_to_checkpoint - [self.ongoing_checkpoint.current_page as usize]; + let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint + [self.ongoing_checkpoint.current_page as usize]; - // Try cache first, if enabled - if let Some(cached_page) = - pager.cache_get_for_checkpoint(page_id as usize, target_frame, seq)? - { - let contents = cached_page.get_contents(); - let buffer = contents.buffer.clone(); - // TODO: remove this eventually to actually benefit from the - // performance.. for now we assert that the cached page has the - // exact contents as one read from the WAL. - #[cfg(debug_assertions)] - { - let mut raw = - vec![0u8; self.page_size() as usize + WAL_FRAME_HEADER_SIZE]; - self.io.wait_for_completion( - self.read_frame_raw(target_frame, &mut raw)?, - )?; - let (_, wal_page) = sqlite3_ondisk::parse_wal_frame_header(&raw); - let cached = cached_page.get_contents().buffer.as_slice(); - turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}"); + // dont use cached pages unless we hold the write lock, preventing them + // from being touched from under us. + if matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })) { + if let Some(cached_page) = pager.cache_get_for_checkpoint( + page_id as usize, + target_frame, + seq, + )? { + let contents = cached_page.get_contents().buffer.clone(); + // to avoid TOCTOU issues with using cached pages, we snapshot the contents and pay the memcpy + // instead of risking the page changing out from under us. + let buffer = Arc::new(self.buffer_pool.get_page()); + buffer.as_mut_slice()[..contents.len()] + .copy_from_slice(contents.as_slice()); + // TODO: remove this eventually to actually benefit from the + // performance.. for now we assert that the cached page has the + // exact contents as one read from the WAL. + #[cfg(debug_assertions)] + { + let mut raw = vec![ + 0u8; + self.page_size() as usize + + WAL_FRAME_HEADER_SIZE + ]; + self.io.wait_for_completion( + self.read_frame_raw(target_frame, &mut raw)?, + )?; + let (_, wal_page) = + sqlite3_ondisk::parse_wal_frame_header(&raw); + let cached = buffer.as_slice(); + turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}"); + } + turso_assert!( + cached_page.is_valid_for_checkpoint(target_frame, seq), + " should still be valid after snapshotting the page" + ); + self.ongoing_checkpoint + .pending_writes + .insert(page_id as usize, buffer); + + // signify that a cached page was used, so it can be unpinned + self.ongoing_checkpoint.pages_to_checkpoint + [self.ongoing_checkpoint.current_page as usize] = + (page_id, target_frame); + self.ongoing_checkpoint.current_page += 1; + continue; } - self.ongoing_checkpoint - .pending_writes - .insert(page_id as usize, buffer); - - // signify that a cached page was used, so it can be unpinned - self.ongoing_checkpoint.pages_to_checkpoint - [self.ongoing_checkpoint.current_page as usize] = - (page_id, target_frame, true); - self.ongoing_checkpoint.current_page += 1; - continue; } // Issue read if page wasn't found in the page cache or doesnt meet // the frame requirements @@ -1906,20 +1923,6 @@ impl WalFile { if !completions.is_empty() { io_yield_many!(completions); } else if self.ongoing_checkpoint.complete() { - // if we are completely done backfilling, we need to unpin any pages we used from the page cache. - for (page_id, _, cached) in - self.ongoing_checkpoint.pages_to_checkpoint.iter() - { - if *cached { - let page = pager.cache_get((*page_id) as usize)?; - turso_assert!( - page.is_some(), - "page should still exist in the page cache" - ); - // if we used a cached page, unpin it - page.map(|p| p.try_unpin()); - } - } self.ongoing_checkpoint.state = CheckpointState::Done; } }