mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-18 17:14:20 +01:00
Merge 'WAL record db_size frame on commit last frame' from Pere Diaz Bou
`db_size` is `>0` in case of last frame written of a transaction. This is necessary as we need to know -- while recovering wal contents -- that we have read a transaction fully instead of treating every frame as its own transaction. Closes #1866
This commit is contained in:
@@ -750,12 +750,14 @@ impl Pager {
|
|||||||
match state {
|
match state {
|
||||||
FlushState::Start => {
|
FlushState::Start => {
|
||||||
let db_size = header_accessor::get_database_size(self)?;
|
let db_size = header_accessor::get_database_size(self)?;
|
||||||
for page_id in self.dirty_pages.borrow().iter() {
|
for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() {
|
||||||
|
let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1;
|
||||||
let mut cache = self.page_cache.write();
|
let mut cache = self.page_cache.write();
|
||||||
let page_key = PageCacheKey::new(*page_id);
|
let page_key = PageCacheKey::new(*page_id);
|
||||||
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||||
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
|
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
|
||||||
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
|
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
|
||||||
|
let db_size = if is_last_frame { db_size } else { 0 };
|
||||||
self.wal.borrow_mut().append_frame(
|
self.wal.borrow_mut().append_frame(
|
||||||
page.clone(),
|
page.clone(),
|
||||||
db_size,
|
db_size,
|
||||||
|
|||||||
@@ -1385,7 +1385,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
|||||||
|
|
||||||
let frame_h_page_number =
|
let frame_h_page_number =
|
||||||
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
|
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
|
||||||
let _frame_h_db_size = u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
|
let frame_h_db_size = u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
|
||||||
let frame_h_salt_1 = u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
|
let frame_h_salt_1 = u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
|
||||||
let frame_h_salt_2 = u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
|
let frame_h_salt_2 = u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
|
||||||
let frame_h_checksum_1 =
|
let frame_h_checksum_1 =
|
||||||
@@ -1441,14 +1441,16 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
|||||||
.lock()
|
.lock()
|
||||||
.push(frame_h_page_number as u64);
|
.push(frame_h_page_number as u64);
|
||||||
|
|
||||||
|
let is_commit_record = frame_h_db_size > 0;
|
||||||
|
if is_commit_record {
|
||||||
|
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
|
||||||
|
wfs_data.last_checksum = cumulative_checksum;
|
||||||
|
}
|
||||||
|
|
||||||
frame_idx += 1;
|
frame_idx += 1;
|
||||||
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
|
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs_data
|
|
||||||
.max_frame
|
|
||||||
.store(frame_idx.saturating_sub(1), Ordering::SeqCst);
|
|
||||||
wfs_data.last_checksum = cumulative_checksum;
|
|
||||||
wfs_data.loaded.store(true, Ordering::SeqCst);
|
wfs_data.loaded.store(true, Ordering::SeqCst);
|
||||||
});
|
});
|
||||||
let c = Completion::new(CompletionType::Read(ReadCompletion::new(
|
let c = Completion::new(CompletionType::Read(ReadCompletion::new(
|
||||||
|
|||||||
@@ -223,6 +223,10 @@ pub trait Wal {
|
|||||||
) -> Result<Arc<Completion>>;
|
) -> Result<Arc<Completion>>;
|
||||||
|
|
||||||
/// Write a frame to the WAL.
|
/// Write a frame to the WAL.
|
||||||
|
/// db_size is the database size in pages after the transaction finishes.
|
||||||
|
/// db_size > 0 -> last frame written in transaction
|
||||||
|
/// db_size == 0 -> non-last frame written in transaction
|
||||||
|
/// write_counter is the counter we use to track when the I/O operation starts and completes
|
||||||
fn append_frame(
|
fn append_frame(
|
||||||
&mut self,
|
&mut self,
|
||||||
page: PageRef,
|
page: PageRef,
|
||||||
|
|||||||
@@ -1,8 +1,13 @@
|
|||||||
use crate::common::{self, maybe_setup_tracing};
|
use crate::common::{self, maybe_setup_tracing};
|
||||||
use crate::common::{compare_string, do_flush, TempDatabase};
|
use crate::common::{compare_string, do_flush, TempDatabase};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::os::unix::fs::FileExt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use turso_core::{Connection, Row, Statement, StepResult, Value};
|
use turso_core::{Connection, Database, Row, Statement, StepResult, Value};
|
||||||
|
|
||||||
|
const WAL_HEADER_SIZE: usize = 32;
|
||||||
|
const WAL_FRAME_HEADER_SIZE: usize = 24;
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! change_state {
|
macro_rules! change_state {
|
||||||
@@ -638,6 +643,96 @@ fn test_write_concurrent_connections() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_wal_bad_frame() -> anyhow::Result<()> {
|
||||||
|
maybe_setup_tracing();
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
let db_path = {
|
||||||
|
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE t1(x)", false);
|
||||||
|
let db_path = tmp_db.path.clone();
|
||||||
|
let conn = tmp_db.connect_limbo();
|
||||||
|
conn.execute("BEGIN")?;
|
||||||
|
conn.execute("CREATE TABLE t2(x)")?;
|
||||||
|
conn.execute("CREATE TABLE t3(x)")?;
|
||||||
|
conn.execute("INSERT INTO t2(x) VALUES (1)")?;
|
||||||
|
conn.execute("INSERT INTO t3(x) VALUES (1)")?;
|
||||||
|
conn.execute("COMMIT")?;
|
||||||
|
run_query_on_row(&tmp_db, &conn, "SELECT count(1) from t2", |row| {
|
||||||
|
let x = row.get::<i64>(0).unwrap();
|
||||||
|
assert_eq!(x, 1);
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
run_query_on_row(&tmp_db, &conn, "SELECT count(1) from t3", |row| {
|
||||||
|
let x = row.get::<i64>(0).unwrap();
|
||||||
|
assert_eq!(x, 1);
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
// Now let's modify last frame record
|
||||||
|
let path = tmp_db.path.clone();
|
||||||
|
let path = path.with_extension("db-wal");
|
||||||
|
let mut file = std::fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.open(&path)
|
||||||
|
.unwrap();
|
||||||
|
let offset = WAL_HEADER_SIZE + (WAL_FRAME_HEADER_SIZE + 4096) * 2;
|
||||||
|
let mut buf = [0u8; WAL_FRAME_HEADER_SIZE];
|
||||||
|
file.read_at(&mut buf, offset as u64).unwrap();
|
||||||
|
dbg!(&buf);
|
||||||
|
let db_size = u32::from_be_bytes(buf[4..8].try_into().unwrap());
|
||||||
|
dbg!(offset);
|
||||||
|
assert_eq!(db_size, 4);
|
||||||
|
// let's overwrite size_after to be 0 so that we think transaction never finished
|
||||||
|
buf[4..8].copy_from_slice(&[0, 0, 0, 0]);
|
||||||
|
file.write_at(&buf, offset as u64).unwrap();
|
||||||
|
file.flush().unwrap();
|
||||||
|
|
||||||
|
db_path
|
||||||
|
};
|
||||||
|
{
|
||||||
|
let result = std::panic::catch_unwind(|| {
|
||||||
|
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new().unwrap());
|
||||||
|
let db = Database::open_file_with_flags(
|
||||||
|
io.clone(),
|
||||||
|
db_path.to_str().unwrap(),
|
||||||
|
limbo_core::OpenFlags::default(),
|
||||||
|
false,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let tmp_db = TempDatabase {
|
||||||
|
path: db_path,
|
||||||
|
io,
|
||||||
|
db,
|
||||||
|
};
|
||||||
|
let conn = tmp_db.connect_limbo();
|
||||||
|
run_query_on_row(&tmp_db, &conn, "SELECT count(1) from t2", |row| {
|
||||||
|
let x = row.get::<i64>(0).unwrap();
|
||||||
|
assert_eq!(x, 0);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Err(panic_info) => {
|
||||||
|
let panic_msg = panic_info
|
||||||
|
.downcast_ref::<String>()
|
||||||
|
.map(|s| s.as_str())
|
||||||
|
.or_else(|| panic_info.downcast_ref::<&str>().copied())
|
||||||
|
.unwrap_or("Unknown panic message");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
panic_msg.contains("WAL frame checksum mismatch."),
|
||||||
|
"Expected panic message not found. Got: {}",
|
||||||
|
panic_msg
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(_) => panic!("Expected query to panic, but it succeeded"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn run_query(tmp_db: &TempDatabase, conn: &Arc<Connection>, query: &str) -> anyhow::Result<()> {
|
fn run_query(tmp_db: &TempDatabase, conn: &Arc<Connection>, query: &str) -> anyhow::Result<()> {
|
||||||
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
|
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user