fix wal insert frame raw API

- we need to properly mark pages as dirty after insertion
This commit is contained in:
Nikita Sivukhin
2025-07-23 21:09:09 +04:00
parent 435ca7fe7a
commit 4a80306705
4 changed files with 59 additions and 29 deletions

View File

@@ -840,13 +840,13 @@ impl Connection {
/// Finish WAL session by ending read+write transaction taken in the [Self::wal_insert_begin] method
/// All frames written after last commit frame (db_size > 0) within the session will be rolled back
#[cfg(feature = "fs")]
pub fn wal_insert_end(&self) -> Result<()> {
pub fn wal_insert_end(self: &Arc<Connection>) -> Result<()> {
let pager = self.pager.borrow();
let mut wal = pager.wal.borrow_mut();
// remove all non-commited changes in case if WAL session left some suffix without commit frame
wal.rollback()
.expect("wal must be able to rollback any non-commited changes");
// remove all non-commited changes in case if WAL session left some suffix without commit frame
pager.rollback(false, self).expect("rollback must succeed");
let wal = pager.wal.borrow_mut();
wal.end_write_tx();
wal.end_read_tx();
Ok(())

View File

@@ -3,7 +3,9 @@ use crate::storage::btree::BTreePageInner;
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::header_accessor;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
use crate::storage::sqlite3_ondisk::{
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType,
};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::IOResult;
use crate::util::IOExt as _;
@@ -1018,7 +1020,29 @@ impl Pager {
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> {
let mut wal = self.wal.borrow_mut();
wal.write_frame_raw(self.buffer_pool.clone(), frame_no as u64, frame)
let (header, raw_page) = parse_wal_frame_header(frame);
wal.write_frame_raw(
self.buffer_pool.clone(),
frame_no as u64,
header.page_number as u64,
header.db_size as u64,
raw_page,
)?;
if let Some(page) = self.cache_get(header.page_number as usize) {
let content = page.get_contents();
content.as_ptr().copy_from_slice(raw_page);
self.add_dirty(header.page_number as usize, &page);
}
if header.db_size > 0 {
for page_id in self.dirty_pages.borrow().iter() {
let page_key = PageCacheKey::new(*page_id);
let mut cache = self.page_cache.write();
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
page.clear_dirty();
}
self.dirty_pages.borrow_mut().clear();
}
Ok(())
}
#[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)]

View File

@@ -1546,21 +1546,23 @@ pub fn begin_read_wal_frame(
Ok(c)
}
pub fn parse_wal_frame_header(frame: &[u8]) -> WalFrameHeader {
pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) {
let page_number = u32::from_be_bytes(frame[0..4].try_into().unwrap());
let db_size = u32::from_be_bytes(frame[4..8].try_into().unwrap());
let salt_1 = u32::from_be_bytes(frame[8..12].try_into().unwrap());
let salt_2 = u32::from_be_bytes(frame[12..16].try_into().unwrap());
let checksum_1 = u32::from_be_bytes(frame[16..20].try_into().unwrap());
let checksum_2 = u32::from_be_bytes(frame[20..24].try_into().unwrap());
WalFrameHeader {
let header = WalFrameHeader {
page_number,
db_size,
salt_1,
salt_2,
checksum_1,
checksum_2,
}
};
let page = &frame[WAL_FRAME_HEADER_SIZE..];
(header, page)
}
pub fn prepare_wal_frame(

View File

@@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock;
use crate::io::{File, IO};
use crate::result::LimboResult;
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, parse_wal_frame_header,
prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame,
WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::types::IOResult;
use crate::{turso_assert, Buffer, LimboError, Result};
@@ -223,7 +223,9 @@ pub trait Wal {
&mut self,
buffer_pool: Arc<BufferPool>,
frame_id: u64,
frame: &[u8],
page_id: u64,
db_size: u64,
page: &[u8],
) -> Result<()>;
/// Write a frame to the WAL.
@@ -296,7 +298,9 @@ impl Wal for DummyWAL {
&mut self,
_buffer_pool: Arc<BufferPool>,
_frame_id: u64,
_frame: &[u8],
_page_id: u64,
_db_size: u64,
_page: &[u8],
) -> Result<()> {
todo!();
}
@@ -659,15 +663,16 @@ impl Wal for WalFile {
&mut self,
buffer_pool: Arc<BufferPool>,
frame_id: u64,
frame: &[u8],
page_id: u64,
db_size: u64,
page: &[u8],
) -> Result<()> {
tracing::debug!("write_raw_frame({})", frame_id);
let expected_frame_len = WAL_FRAME_HEADER_SIZE + self.page_size() as usize;
if frame.len() != expected_frame_len {
if page.len() != self.page_size() as usize {
return Err(LimboError::InvalidArgument(format!(
"unexpected frame size: got={}, expected={}",
frame.len(),
expected_frame_len
"unexpected page size in frame: got={}, expected={}",
page.len(),
self.page_size(),
)));
}
if frame_id > self.max_frame + 1 {
@@ -681,7 +686,7 @@ impl Wal for WalFile {
// just validate if page content from the frame matches frame in the WAL
let offset = self.frame_offset(frame_id);
let conflict = Arc::new(Cell::new(false));
let (frame_ptr, frame_len) = (frame.as_ptr(), frame.len());
let (page_ptr, page_len) = (page.as_ptr(), page.len());
let complete = Box::new({
let conflict = conflict.clone();
move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
@@ -691,8 +696,8 @@ impl Wal for WalFile {
bytes_read == buf_len as i32,
"read({bytes_read}) != expected({buf_len})"
);
let frame = unsafe { std::slice::from_raw_parts(frame_ptr, frame_len) };
if buf.as_slice() != &frame[WAL_FRAME_HEADER_SIZE..] {
let page = unsafe { std::slice::from_raw_parts(page_ptr, page_len) };
if buf.as_slice() != page {
conflict.set(true);
}
}
@@ -719,20 +724,19 @@ impl Wal for WalFile {
let header = shared.wal_header.clone();
let header = header.lock();
let checksums = self.last_checksum;
let frame_header = parse_wal_frame_header(frame);
let (checksums, frame_bytes) = prepare_wal_frame(
&header,
checksums,
header.page_size,
frame_header.page_number,
frame_header.db_size,
&frame[WAL_FRAME_HEADER_SIZE..],
page_id as u32,
db_size as u32,
page,
);
let c = Arc::new(Completion::new_write(|_| {}));
let c = shared.file.pwrite(offset, frame_bytes, c)?;
self.io.wait_for_completion(c)?;
self.complete_append_frame(frame_header.page_number as u64, frame_id, checksums);
if frame_header.db_size > 0 {
self.complete_append_frame(page_id, frame_id, checksums);
if db_size > 0 {
self.finish_append_frames_commit()?;
}
Ok(())