mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-18 09:04:19 +01:00
Merge 'WAL insert: mark pages as dirty' from Nikita Sivukhin
WAL insert API introduced in the #2231 works incorrectly as it never mark inserted pages as dirty. This PR fixes this issue and also add simple fuzz test which fails without fixes. Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #2245
This commit is contained in:
10
core/lib.rs
10
core/lib.rs
@@ -840,13 +840,13 @@ impl Connection {
|
||||
/// Finish WAL session by ending read+write transaction taken in the [Self::wal_insert_begin] method
|
||||
/// All frames written after last commit frame (db_size > 0) within the session will be rolled back
|
||||
#[cfg(feature = "fs")]
|
||||
pub fn wal_insert_end(&self) -> Result<()> {
|
||||
pub fn wal_insert_end(self: &Arc<Connection>) -> Result<()> {
|
||||
let pager = self.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
// remove all non-commited changes in case if WAL session left some suffix without commit frame
|
||||
wal.rollback()
|
||||
.expect("wal must be able to rollback any non-commited changes");
|
||||
|
||||
// remove all non-commited changes in case if WAL session left some suffix without commit frame
|
||||
pager.rollback(false, self).expect("rollback must succeed");
|
||||
|
||||
let wal = pager.wal.borrow_mut();
|
||||
wal.end_write_tx();
|
||||
wal.end_read_tx();
|
||||
Ok(())
|
||||
|
||||
@@ -3,7 +3,9 @@ use crate::storage::btree::BTreePageInner;
|
||||
use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::header_accessor;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType,
|
||||
};
|
||||
use crate::storage::wal::{CheckpointResult, Wal};
|
||||
use crate::types::IOResult;
|
||||
use crate::util::IOExt as _;
|
||||
@@ -1018,7 +1020,33 @@ impl Pager {
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> {
|
||||
let mut wal = self.wal.borrow_mut();
|
||||
wal.write_frame_raw(self.buffer_pool.clone(), frame_no as u64, frame)
|
||||
let (header, raw_page) = parse_wal_frame_header(frame);
|
||||
wal.write_frame_raw(
|
||||
self.buffer_pool.clone(),
|
||||
frame_no as u64,
|
||||
header.page_number as u64,
|
||||
header.db_size as u64,
|
||||
raw_page,
|
||||
)?;
|
||||
if let Some(page) = self.cache_get(header.page_number as usize) {
|
||||
let content = page.get_contents();
|
||||
content.as_ptr().copy_from_slice(raw_page);
|
||||
turso_assert!(
|
||||
page.get().id == header.page_number as usize,
|
||||
"page has unexpected id"
|
||||
);
|
||||
self.add_dirty(&page);
|
||||
}
|
||||
if header.is_commit_frame() {
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let page_key = PageCacheKey::new(*page_id);
|
||||
let mut cache = self.page_cache.write();
|
||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
page.clear_dirty();
|
||||
}
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)]
|
||||
|
||||
@@ -245,6 +245,12 @@ pub struct WalFrameHeader {
|
||||
pub(crate) checksum_2: u32,
|
||||
}
|
||||
|
||||
impl WalFrameHeader {
|
||||
pub fn is_commit_frame(&self) -> bool {
|
||||
self.db_size > 0
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DatabaseHeader {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -1546,21 +1552,23 @@ pub fn begin_read_wal_frame(
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
pub fn parse_wal_frame_header(frame: &[u8]) -> WalFrameHeader {
|
||||
pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) {
|
||||
let page_number = u32::from_be_bytes(frame[0..4].try_into().unwrap());
|
||||
let db_size = u32::from_be_bytes(frame[4..8].try_into().unwrap());
|
||||
let salt_1 = u32::from_be_bytes(frame[8..12].try_into().unwrap());
|
||||
let salt_2 = u32::from_be_bytes(frame[12..16].try_into().unwrap());
|
||||
let checksum_1 = u32::from_be_bytes(frame[16..20].try_into().unwrap());
|
||||
let checksum_2 = u32::from_be_bytes(frame[20..24].try_into().unwrap());
|
||||
WalFrameHeader {
|
||||
let header = WalFrameHeader {
|
||||
page_number,
|
||||
db_size,
|
||||
salt_1,
|
||||
salt_2,
|
||||
checksum_1,
|
||||
checksum_2,
|
||||
}
|
||||
};
|
||||
let page = &frame[WAL_FRAME_HEADER_SIZE..];
|
||||
(header, page)
|
||||
}
|
||||
|
||||
pub fn prepare_wal_frame(
|
||||
|
||||
@@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock;
|
||||
use crate::io::{File, IO};
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, parse_wal_frame_header,
|
||||
prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame,
|
||||
WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
|
||||
};
|
||||
use crate::types::IOResult;
|
||||
use crate::{turso_assert, Buffer, LimboError, Result};
|
||||
@@ -223,7 +223,9 @@ pub trait Wal {
|
||||
&mut self,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
frame_id: u64,
|
||||
frame: &[u8],
|
||||
page_id: u64,
|
||||
db_size: u64,
|
||||
page: &[u8],
|
||||
) -> Result<()>;
|
||||
|
||||
/// Write a frame to the WAL.
|
||||
@@ -296,7 +298,9 @@ impl Wal for DummyWAL {
|
||||
&mut self,
|
||||
_buffer_pool: Arc<BufferPool>,
|
||||
_frame_id: u64,
|
||||
_frame: &[u8],
|
||||
_page_id: u64,
|
||||
_db_size: u64,
|
||||
_page: &[u8],
|
||||
) -> Result<()> {
|
||||
todo!();
|
||||
}
|
||||
@@ -659,15 +663,16 @@ impl Wal for WalFile {
|
||||
&mut self,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
frame_id: u64,
|
||||
frame: &[u8],
|
||||
page_id: u64,
|
||||
db_size: u64,
|
||||
page: &[u8],
|
||||
) -> Result<()> {
|
||||
tracing::debug!("write_raw_frame({})", frame_id);
|
||||
let expected_frame_len = WAL_FRAME_HEADER_SIZE + self.page_size() as usize;
|
||||
if frame.len() != expected_frame_len {
|
||||
if page.len() != self.page_size() as usize {
|
||||
return Err(LimboError::InvalidArgument(format!(
|
||||
"unexpected frame size: got={}, expected={}",
|
||||
frame.len(),
|
||||
expected_frame_len
|
||||
"unexpected page size in frame: got={}, expected={}",
|
||||
page.len(),
|
||||
self.page_size(),
|
||||
)));
|
||||
}
|
||||
if frame_id > self.max_frame + 1 {
|
||||
@@ -681,7 +686,7 @@ impl Wal for WalFile {
|
||||
// just validate if page content from the frame matches frame in the WAL
|
||||
let offset = self.frame_offset(frame_id);
|
||||
let conflict = Arc::new(Cell::new(false));
|
||||
let (frame_ptr, frame_len) = (frame.as_ptr(), frame.len());
|
||||
let (page_ptr, page_len) = (page.as_ptr(), page.len());
|
||||
let complete = Box::new({
|
||||
let conflict = conflict.clone();
|
||||
move |buf: Arc<RefCell<Buffer>>, bytes_read: i32| {
|
||||
@@ -691,8 +696,8 @@ impl Wal for WalFile {
|
||||
bytes_read == buf_len as i32,
|
||||
"read({bytes_read}) != expected({buf_len})"
|
||||
);
|
||||
let frame = unsafe { std::slice::from_raw_parts(frame_ptr, frame_len) };
|
||||
if buf.as_slice() != &frame[WAL_FRAME_HEADER_SIZE..] {
|
||||
let page = unsafe { std::slice::from_raw_parts(page_ptr, page_len) };
|
||||
if buf.as_slice() != page {
|
||||
conflict.set(true);
|
||||
}
|
||||
}
|
||||
@@ -719,20 +724,19 @@ impl Wal for WalFile {
|
||||
let header = shared.wal_header.clone();
|
||||
let header = header.lock();
|
||||
let checksums = self.last_checksum;
|
||||
let frame_header = parse_wal_frame_header(frame);
|
||||
let (checksums, frame_bytes) = prepare_wal_frame(
|
||||
&header,
|
||||
checksums,
|
||||
header.page_size,
|
||||
frame_header.page_number,
|
||||
frame_header.db_size,
|
||||
&frame[WAL_FRAME_HEADER_SIZE..],
|
||||
page_id as u32,
|
||||
db_size as u32,
|
||||
page,
|
||||
);
|
||||
let c = Arc::new(Completion::new_write(|_| {}));
|
||||
let c = shared.file.pwrite(offset, frame_bytes, c)?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
self.complete_append_frame(frame_header.page_number as u64, frame_id, checksums);
|
||||
if frame_header.db_size > 0 {
|
||||
self.complete_append_frame(page_id, frame_id, checksums);
|
||||
if db_size > 0 {
|
||||
self.finish_append_frames_commit()?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use rand::{rng, RngCore};
|
||||
use rand::{rng, RngCore, SeedableRng};
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use rusqlite::params;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
@@ -238,6 +239,15 @@ pub(crate) fn limbo_exec_rows_error(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn rng_from_time() -> (ChaCha8Rng, u64) {
|
||||
let seed = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
let rng = ChaCha8Rng::seed_from_u64(seed);
|
||||
(rng, seed)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::vec;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use rusqlite::types::Value;
|
||||
|
||||
use crate::common::{limbo_exec_rows, TempDatabase};
|
||||
use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase};
|
||||
|
||||
#[test]
|
||||
fn test_wal_frame_count() {
|
||||
@@ -142,3 +144,61 @@ fn test_wal_frame_far_away_write() {
|
||||
db1.io.wait_for_completion(c).unwrap();
|
||||
assert!(conn2.wal_insert_frame(5, &frame).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_frame_api_no_schema_changes_fuzz() {
|
||||
let (mut rng, _) = rng_from_time();
|
||||
for _ in 0..4 {
|
||||
let db1 = TempDatabase::new_empty(false);
|
||||
let conn1 = db1.connect_limbo();
|
||||
let db2 = TempDatabase::new_empty(false);
|
||||
let conn2 = db2.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
|
||||
let seed = rng.next_u64();
|
||||
let mut rng = ChaCha8Rng::seed_from_u64(seed);
|
||||
println!("SEED: {seed}");
|
||||
|
||||
let (mut size, mut synced_frame) = (0, conn2.wal_frame_count().unwrap());
|
||||
let mut commit_frames = vec![conn1.wal_frame_count().unwrap()];
|
||||
for _ in 0..256 {
|
||||
if rng.next_u32() % 10 != 0 {
|
||||
let key = rng.next_u32();
|
||||
let length = rng.next_u32() % (4 * 4096);
|
||||
let query = format!("INSERT INTO t VALUES ({key}, randomblob({length}))");
|
||||
conn1.execute(&query).unwrap();
|
||||
commit_frames.push(conn1.wal_frame_count().unwrap());
|
||||
} else {
|
||||
let last_frame = conn1.wal_frame_count().unwrap();
|
||||
let next_frame =
|
||||
synced_frame + (rng.next_u32() as u64 % (last_frame - synced_frame + 1));
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
for frame_no in (synced_frame + 1)..=next_frame {
|
||||
let c = conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap();
|
||||
db1.io.wait_for_completion(c).unwrap();
|
||||
conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap();
|
||||
}
|
||||
conn2.wal_insert_end().unwrap();
|
||||
for (i, committed) in commit_frames.iter().enumerate() {
|
||||
if *committed <= next_frame {
|
||||
size = size.max(i);
|
||||
synced_frame = *committed;
|
||||
}
|
||||
}
|
||||
if rng.next_u32() % 10 == 0 {
|
||||
synced_frame = rng.next_u32() as u64 % synced_frame;
|
||||
}
|
||||
assert_eq!(
|
||||
limbo_exec_rows(&db2, &conn2, "SELECT COUNT(*) FROM t"),
|
||||
vec![vec![Value::Integer(size as i64)]]
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,21 +10,12 @@ mod tests {
|
||||
use rusqlite::params;
|
||||
|
||||
use crate::{
|
||||
common::{limbo_exec_rows, sqlite_exec_rows, TempDatabase},
|
||||
common::{limbo_exec_rows, rng_from_time, sqlite_exec_rows, TempDatabase},
|
||||
fuzz::grammar_generator::{const_str, rand_int, rand_str, GrammarGenerator},
|
||||
};
|
||||
|
||||
use super::grammar_generator::SymbolHandle;
|
||||
|
||||
fn rng_from_time() -> (ChaCha8Rng, u64) {
|
||||
let seed = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
let rng = ChaCha8Rng::seed_from_u64(seed);
|
||||
(rng, seed)
|
||||
}
|
||||
|
||||
/// [See this issue for more info](https://github.com/tursodatabase/turso/issues/1763)
|
||||
#[test]
|
||||
pub fn fuzz_failure_issue_1763() {
|
||||
|
||||
Reference in New Issue
Block a user