mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
read from database file only if max_frame_read_lock_index is 0 and max_frame > min_frame
- transaction which was started with max_frame = 0 and max_frame_read_lock_index = 0 can write to the WAL and in this case it needs to read data back from WAL - without cache spilling its hard to reproduce this issue for the turso-db now, but I stumbled into this issue with sync-engine which do weird stuff with the WAL which "simulates" cache spilling behaviour to some extent
This commit is contained in:
@@ -1416,22 +1416,25 @@ impl Pager {
|
||||
"wal_insert_frame() called on database without WAL".to_string(),
|
||||
));
|
||||
};
|
||||
let mut wal = wal.borrow_mut();
|
||||
let (header, raw_page) = parse_wal_frame_header(frame);
|
||||
wal.write_frame_raw(
|
||||
self.buffer_pool.clone(),
|
||||
frame_no,
|
||||
header.page_number as u64,
|
||||
header.db_size as u64,
|
||||
raw_page,
|
||||
)?;
|
||||
if let Some(page) = self.cache_get(header.page_number as usize) {
|
||||
let content = page.get_contents();
|
||||
content.as_ptr().copy_from_slice(raw_page);
|
||||
turso_assert!(
|
||||
page.get().id == header.page_number as usize,
|
||||
"page has unexpected id"
|
||||
);
|
||||
|
||||
{
|
||||
let mut wal = wal.borrow_mut();
|
||||
wal.write_frame_raw(
|
||||
self.buffer_pool.clone(),
|
||||
frame_no,
|
||||
header.page_number as u64,
|
||||
header.db_size as u64,
|
||||
raw_page,
|
||||
)?;
|
||||
if let Some(page) = self.cache_get(header.page_number as usize) {
|
||||
let content = page.get_contents();
|
||||
content.as_ptr().copy_from_slice(raw_page);
|
||||
turso_assert!(
|
||||
page.get().id == header.page_number as usize,
|
||||
"page has unexpected id"
|
||||
);
|
||||
}
|
||||
}
|
||||
if header.page_number == 1 {
|
||||
let db_size = self
|
||||
|
||||
@@ -815,14 +815,14 @@ impl Wal for WalFile {
|
||||
// WAL and fetch pages directly from the DB file. We do this
|
||||
// by taking read‑lock 0, and capturing the latest state.
|
||||
if shared_max == nbackfills {
|
||||
let lock_idx = 0;
|
||||
if !self.get_shared().read_locks[lock_idx].read() {
|
||||
let lock_0_idx = 0;
|
||||
if !self.get_shared().read_locks[lock_0_idx].read() {
|
||||
return Ok((LimboResult::Busy, db_changed));
|
||||
}
|
||||
// we need to keep self.max_frame set to the appropriate
|
||||
// max frame in the wal at the time this transaction starts.
|
||||
self.max_frame = shared_max;
|
||||
self.max_frame_read_lock_index.set(lock_idx);
|
||||
self.max_frame_read_lock_index.set(lock_0_idx);
|
||||
self.min_frame = nbackfills + 1;
|
||||
self.last_checksum = last_checksum;
|
||||
return Ok((LimboResult::Ok, db_changed));
|
||||
@@ -965,7 +965,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
|
||||
// Snapshot is stale, give up and let caller retry from scratch
|
||||
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch");
|
||||
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max);
|
||||
shared.write_lock.unlock();
|
||||
Ok(LimboResult::Busy)
|
||||
}
|
||||
@@ -1000,8 +1000,18 @@ impl Wal for WalFile {
|
||||
"frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire)
|
||||
);
|
||||
|
||||
// if we are holding read_lock 0, skip and read right from db file.
|
||||
if self.max_frame_read_lock_index.get() == 0 {
|
||||
// if we are holding read_lock 0 and didn't write anything to the WAL, skip and read right from db file.
|
||||
//
|
||||
// note, that max_frame_read_lock_index is set to 0 only when shared_max_frame == nbackfill in which case
|
||||
// min_frame is set to nbackfill + 1 and max_frame is set to shared_max_frame
|
||||
//
|
||||
// by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db
|
||||
if self.max_frame_read_lock_index.get() == 0 && self.max_frame < self.min_frame {
|
||||
tracing::debug!(
|
||||
"find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file",
|
||||
page_id,
|
||||
frame_watermark,
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
let shared = self.get_shared();
|
||||
@@ -1009,8 +1019,21 @@ impl Wal for WalFile {
|
||||
let range = frame_watermark
|
||||
.map(|x| 0..=x)
|
||||
.unwrap_or(self.min_frame..=self.max_frame);
|
||||
tracing::debug!(
|
||||
"find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}",
|
||||
page_id,
|
||||
frame_watermark,
|
||||
self.min_frame,
|
||||
self.max_frame
|
||||
);
|
||||
if let Some(list) = frames.get(&page_id) {
|
||||
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
|
||||
tracing::debug!(
|
||||
"find_frame(page_id={}, frame_watermark={:?}): found frame={}",
|
||||
page_id,
|
||||
frame_watermark,
|
||||
*f
|
||||
);
|
||||
return Ok(Some(*f));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -926,3 +926,53 @@ fn test_db_share_same_file() {
|
||||
]]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_api_simulate_spilled_frames() {
|
||||
let (mut rng, _) = rng_from_time();
|
||||
let db1 = TempDatabase::new_empty(false);
|
||||
let conn1 = db1.connect_limbo();
|
||||
let db2 = TempDatabase::new_empty(false);
|
||||
let conn2 = db2.connect_limbo();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
|
||||
.unwrap();
|
||||
let watermark = conn1.wal_state().unwrap().max_frame;
|
||||
for _ in 0..128 {
|
||||
let key = rng.next_u32();
|
||||
let length = rng.next_u32() % 4096 + 1;
|
||||
conn1
|
||||
.execute(format!(
|
||||
"INSERT INTO t VALUES ({key}, randomblob({length}))"
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
let mut frame = [0u8; 24 + 4096];
|
||||
conn2
|
||||
.checkpoint(CheckpointMode::Truncate {
|
||||
upper_bound_inclusive: None,
|
||||
})
|
||||
.unwrap();
|
||||
conn2.wal_insert_begin().unwrap();
|
||||
let frames_count = conn1.wal_state().unwrap().max_frame;
|
||||
for frame_id in watermark + 1..=frames_count {
|
||||
let mut info = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
|
||||
info.db_size = 0;
|
||||
info.put_to_frame_header(&mut frame);
|
||||
conn2
|
||||
.wal_insert_frame(frame_id - watermark, &frame)
|
||||
.unwrap();
|
||||
}
|
||||
for _ in 0..128 {
|
||||
let key = rng.next_u32();
|
||||
let length = rng.next_u32() % 4096 + 1;
|
||||
conn2
|
||||
.execute(format!(
|
||||
"INSERT INTO t VALUES ({key}, randomblob({length}))"
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -284,6 +284,7 @@ fn test_wal_checkpoint() -> anyhow::Result<()> {
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
for i in 0..iterations {
|
||||
log::info!("iteration #{}", i);
|
||||
let insert_query = format!("INSERT INTO test VALUES ({i})");
|
||||
do_flush(&conn, &tmp_db)?;
|
||||
conn.checkpoint(CheckpointMode::Passive {
|
||||
@@ -823,7 +824,7 @@ pub fn run_query_core(
|
||||
on_row(row)
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
r => panic!("unexpected step result: {r:?}"),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user