diff --git a/core/lib.rs b/core/lib.rs index a83605459..2c7c0731a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -840,13 +840,13 @@ impl Connection { /// Finish WAL session by ending read+write transaction taken in the [Self::wal_insert_begin] method /// All frames written after last commit frame (db_size > 0) within the session will be rolled back #[cfg(feature = "fs")] - pub fn wal_insert_end(&self) -> Result<()> { + pub fn wal_insert_end(self: &Arc) -> Result<()> { let pager = self.pager.borrow(); - let mut wal = pager.wal.borrow_mut(); - // remove all non-commited changes in case if WAL session left some suffix without commit frame - wal.rollback() - .expect("wal must be able to rollback any non-commited changes"); + // remove all non-commited changes in case if WAL session left some suffix without commit frame + pager.rollback(false, self).expect("rollback must succeed"); + + let wal = pager.wal.borrow_mut(); wal.end_write_tx(); wal.end_read_tx(); Ok(()) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 8df34199b..4cf6f14d3 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -3,7 +3,9 @@ use crate::storage::btree::BTreePageInner; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::header_accessor; -use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; +use crate::storage::sqlite3_ondisk::{ + self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType, +}; use crate::storage::wal::{CheckpointResult, Wal}; use crate::types::IOResult; use crate::util::IOExt as _; @@ -1018,7 +1020,29 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> { let mut wal = self.wal.borrow_mut(); - wal.write_frame_raw(self.buffer_pool.clone(), frame_no as u64, frame) + let (header, raw_page) = parse_wal_frame_header(frame); + wal.write_frame_raw( + self.buffer_pool.clone(), + frame_no as u64, + header.page_number as u64, + header.db_size as u64, + raw_page, + )?; + if let Some(page) = self.cache_get(header.page_number as usize) { + let content = page.get_contents(); + content.as_ptr().copy_from_slice(raw_page); + self.add_dirty(header.page_number as usize, &page); + } + if header.db_size > 0 { + for page_id in self.dirty_pages.borrow().iter() { + let page_key = PageCacheKey::new(*page_id); + let mut cache = self.page_cache.write(); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + page.clear_dirty(); + } + self.dirty_pages.borrow_mut().clear(); + } + Ok(()) } #[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)] diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d0d5249ca..d75571393 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1546,21 +1546,23 @@ pub fn begin_read_wal_frame( Ok(c) } -pub fn parse_wal_frame_header(frame: &[u8]) -> WalFrameHeader { +pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { let page_number = u32::from_be_bytes(frame[0..4].try_into().unwrap()); let db_size = u32::from_be_bytes(frame[4..8].try_into().unwrap()); let salt_1 = u32::from_be_bytes(frame[8..12].try_into().unwrap()); let salt_2 = u32::from_be_bytes(frame[12..16].try_into().unwrap()); let checksum_1 = u32::from_be_bytes(frame[16..20].try_into().unwrap()); let checksum_2 = u32::from_be_bytes(frame[20..24].try_into().unwrap()); - WalFrameHeader { + let header = WalFrameHeader { page_number, db_size, salt_1, salt_2, checksum_1, checksum_2, - } + }; + let page = &frame[WAL_FRAME_HEADER_SIZE..]; + (header, page) } pub fn prepare_wal_frame( diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 63297f7c2..0c29ac161 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock; use crate::io::{File, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ - begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, parse_wal_frame_header, - prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, + begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, + WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::IOResult; use crate::{turso_assert, Buffer, LimboError, Result}; @@ -223,7 +223,9 @@ pub trait Wal { &mut self, buffer_pool: Arc, frame_id: u64, - frame: &[u8], + page_id: u64, + db_size: u64, + page: &[u8], ) -> Result<()>; /// Write a frame to the WAL. @@ -296,7 +298,9 @@ impl Wal for DummyWAL { &mut self, _buffer_pool: Arc, _frame_id: u64, - _frame: &[u8], + _page_id: u64, + _db_size: u64, + _page: &[u8], ) -> Result<()> { todo!(); } @@ -659,15 +663,16 @@ impl Wal for WalFile { &mut self, buffer_pool: Arc, frame_id: u64, - frame: &[u8], + page_id: u64, + db_size: u64, + page: &[u8], ) -> Result<()> { tracing::debug!("write_raw_frame({})", frame_id); - let expected_frame_len = WAL_FRAME_HEADER_SIZE + self.page_size() as usize; - if frame.len() != expected_frame_len { + if page.len() != self.page_size() as usize { return Err(LimboError::InvalidArgument(format!( - "unexpected frame size: got={}, expected={}", - frame.len(), - expected_frame_len + "unexpected page size in frame: got={}, expected={}", + page.len(), + self.page_size(), ))); } if frame_id > self.max_frame + 1 { @@ -681,7 +686,7 @@ impl Wal for WalFile { // just validate if page content from the frame matches frame in the WAL let offset = self.frame_offset(frame_id); let conflict = Arc::new(Cell::new(false)); - let (frame_ptr, frame_len) = (frame.as_ptr(), frame.len()); + let (page_ptr, page_len) = (page.as_ptr(), page.len()); let complete = Box::new({ let conflict = conflict.clone(); move |buf: Arc>, bytes_read: i32| { @@ -691,8 +696,8 @@ impl Wal for WalFile { bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); - let frame = unsafe { std::slice::from_raw_parts(frame_ptr, frame_len) }; - if buf.as_slice() != &frame[WAL_FRAME_HEADER_SIZE..] { + let page = unsafe { std::slice::from_raw_parts(page_ptr, page_len) }; + if buf.as_slice() != page { conflict.set(true); } } @@ -719,20 +724,19 @@ impl Wal for WalFile { let header = shared.wal_header.clone(); let header = header.lock(); let checksums = self.last_checksum; - let frame_header = parse_wal_frame_header(frame); let (checksums, frame_bytes) = prepare_wal_frame( &header, checksums, header.page_size, - frame_header.page_number, - frame_header.db_size, - &frame[WAL_FRAME_HEADER_SIZE..], + page_id as u32, + db_size as u32, + page, ); let c = Arc::new(Completion::new_write(|_| {})); let c = shared.file.pwrite(offset, frame_bytes, c)?; self.io.wait_for_completion(c)?; - self.complete_append_frame(frame_header.page_number as u64, frame_id, checksums); - if frame_header.db_size > 0 { + self.complete_append_frame(page_id, frame_id, checksums); + if db_size > 0 { self.finish_append_frames_commit()?; } Ok(())