mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-16 05:24:22 +01:00
Update Page to carry epoch of frame + checkpont seq to ensure proper cached page for chkpt
This commit is contained in:
@@ -287,7 +287,6 @@ impl Page {
|
||||
self.get().wal_tag.store(TAG_UNSET, Ordering::Release)
|
||||
}
|
||||
|
||||
/// Check if this page is suitable for checkpointing
|
||||
pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool {
|
||||
let (f, s) = self.wal_tag_pair();
|
||||
f == target_frame && s == seq && !self.is_dirty()
|
||||
@@ -916,10 +915,9 @@ impl Pager {
|
||||
let page = return_if_io!(self.allocate_page());
|
||||
if let Some(wal) = &self.wal {
|
||||
let max_frame = wal.borrow().get_max_frame_in_wal();
|
||||
let seq = wal.borrow().checkpoint_seq();
|
||||
// brand new page gets share max frame + 1
|
||||
page.set_wal_frame(max_frame + 1);
|
||||
} else {
|
||||
page.set_wal_frame(0);
|
||||
page.set_wal_tag(max_frame + 1, seq);
|
||||
}
|
||||
let page = Arc::new(BTreePageInner {
|
||||
page: RefCell::new(page),
|
||||
@@ -1417,7 +1415,8 @@ impl Pager {
|
||||
page.get().id == header.page_number as usize,
|
||||
"page has unexpected id"
|
||||
);
|
||||
page.set_wal_frame(frame_no);
|
||||
let seq = wal.checkpoint_seq();
|
||||
page.set_wal_tag(frame_no, seq);
|
||||
self.add_dirty(&page);
|
||||
}
|
||||
if header.is_commit_frame() {
|
||||
|
||||
@@ -281,7 +281,6 @@ pub trait Wal: Debug {
|
||||
fn sync(&mut self) -> Result<Completion>;
|
||||
fn is_syncing(&self) -> bool;
|
||||
fn get_max_frame_in_wal(&self) -> u64;
|
||||
fn get_nbackfills_in_wal(&self) -> u64;
|
||||
fn get_max_frame(&self) -> u64;
|
||||
fn get_min_frame(&self) -> u64;
|
||||
fn rollback(&mut self) -> Result<()>;
|
||||
@@ -303,7 +302,7 @@ pub enum CheckpointState {
|
||||
}
|
||||
|
||||
/// IOV_MAX is 1024 on most systems
|
||||
pub const CKPT_BATCH_PAGES: usize = 1024;
|
||||
pub const CKPT_BATCH_PAGES: usize = 512;
|
||||
|
||||
/// TODO: *ALL* of these need to be tuned for perf. It is tricky
|
||||
/// trying to figure out the ideal numbers here to work together concurrently
|
||||
@@ -414,7 +413,7 @@ impl std::ops::DerefMut for WriteBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Information and
|
||||
/// Information and structures for processing a checkpoint operation.
|
||||
struct OngoingCheckpoint {
|
||||
/// Used for benchmarking/debugging a checkpoint operation.
|
||||
time: std::time::Instant,
|
||||
@@ -437,6 +436,17 @@ struct OngoingCheckpoint {
|
||||
}
|
||||
|
||||
impl OngoingCheckpoint {
|
||||
fn reset(&mut self) {
|
||||
self.min_frame = 0;
|
||||
self.max_frame = 0;
|
||||
self.current_page = 0;
|
||||
self.pages_to_checkpoint.clear();
|
||||
self.pending_writes.clear();
|
||||
self.inflight_reads.clear();
|
||||
self.inflight_writes.clear();
|
||||
self.state = CheckpointState::Start;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Whether or not new reads should be issued during checkpoint processing.
|
||||
fn should_issue_reads(&self) -> bool {
|
||||
@@ -482,7 +492,7 @@ impl OngoingCheckpoint {
|
||||
let mut moved = false;
|
||||
// retain only those still pending
|
||||
self.inflight_reads.retain(|slot| {
|
||||
if slot.done.load(Ordering::Acquire) {
|
||||
if slot.completion.is_completed() {
|
||||
if let Some(buf) = slot.buf.lock().take() {
|
||||
// read is done, take the buffer and add it to the batch to write
|
||||
self.pending_writes.insert(slot.page_id, buf);
|
||||
@@ -1196,7 +1206,6 @@ impl Wal for WalFile {
|
||||
let shared = self.get_shared();
|
||||
let header = shared.wal_header.clone();
|
||||
let header = header.lock();
|
||||
let seq = header.checkpoint_seq;
|
||||
let checksums = self.last_checksum;
|
||||
let page_content = page.get_contents();
|
||||
let page_buf = page_content.as_ptr();
|
||||
@@ -1300,10 +1309,6 @@ impl Wal for WalFile {
|
||||
self.min_frame
|
||||
}
|
||||
|
||||
fn get_nbackfills_in_wal(&self) -> u64 {
|
||||
self.get_shared().nbackfills.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::DEBUG)]
|
||||
fn rollback(&mut self) -> Result<()> {
|
||||
// TODO(pere): have to remove things from frame_cache because they are no longer valid.
|
||||
@@ -1463,15 +1468,8 @@ impl WalFile {
|
||||
}
|
||||
|
||||
fn reset_internal_states(&mut self) {
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
self.ongoing_checkpoint.min_frame = 0;
|
||||
self.ongoing_checkpoint.max_frame = 0;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
|
||||
self.ongoing_checkpoint.batch.clear();
|
||||
self.ongoing_checkpoint.pending_flush.clear();
|
||||
self.sync_state.set(SyncState::NotSyncing);
|
||||
self.ongoing_checkpoint.pages_to_checkpoint.clear();
|
||||
self.ongoing_checkpoint.reset();
|
||||
self.syncing.set(false);
|
||||
}
|
||||
|
||||
@@ -1970,25 +1968,6 @@ impl WalFile {
|
||||
buf: buf_slot,
|
||||
})
|
||||
}
|
||||
|
||||
/// Drain completed inflight reads into the batch.
|
||||
/// Returns true if we moved at least one item.
|
||||
fn drain_inflight_into_batch(&mut self) -> bool {
|
||||
let mut moved = false;
|
||||
// retain only those still pending
|
||||
self.ongoing_checkpoint.inflight.retain(|slot| {
|
||||
if slot.done.load(Ordering::Acquire) {
|
||||
if let Some(buf) = slot.buf.lock().take() {
|
||||
self.ongoing_checkpoint.batch.insert(slot.page_id, buf);
|
||||
moved = true;
|
||||
}
|
||||
false // drop this slot
|
||||
} else {
|
||||
true // keep pending
|
||||
}
|
||||
});
|
||||
moved
|
||||
}
|
||||
}
|
||||
|
||||
impl WalFileShared {
|
||||
|
||||
Reference in New Issue
Block a user