core/mvcc/logical-log: lock recover logical log process

This commit is contained in:
Pere Diaz Bou
2025-09-26 12:47:31 +02:00
parent 334da8abbb
commit 83d8a7c775
3 changed files with 93 additions and 47 deletions

View File

@@ -14,6 +14,7 @@ use crate::types::IOResult;
use crate::types::ImmutableRecord;
use crate::types::SeekResult;
use crate::Completion;
use crate::File;
use crate::IOExt;
use crate::LimboError;
use crate::Result;
@@ -32,6 +33,9 @@ use tracing::Level;
pub mod checkpoint_state_machine;
pub use checkpoint_state_machine::{CheckpointState, CheckpointStateMachine};
use super::persistent_storage::logical_log::StreamingLogicalLogReader;
use super::persistent_storage::logical_log::StreamingResult;
#[cfg(test)]
pub mod tests;
@@ -855,6 +859,10 @@ pub struct MvStore<Clock: LogicalClock> {
/// If there are two concurrent BEGIN (non-CONCURRENT) transactions, and one tries to promote
/// to exclusive, it will abort if another transaction committed after its begin timestamp.
last_committed_tx_ts: AtomicU64,
/// Lock used while recovering a logical log file. We don't want multiple connections trying to
/// load the file.
recover_lock: RwLock<()>,
}
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -878,6 +886,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
checkpointed_txid_max: AtomicU64::new(0),
last_committed_schema_change_ts: AtomicU64::new(0),
last_committed_tx_ts: AtomicU64::new(0),
recover_lock: RwLock::new(()),
}
}
@@ -1739,6 +1748,52 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.unwrap_or(None);
last_rowid
}
pub fn needs_recover(&self) -> bool {
self.storage.needs_recover()
}
pub fn mark_recovered(&self) {
self.storage.mark_recovered();
}
pub fn get_logical_log_file(&self) -> Arc<dyn File> {
self.storage.get_logical_log_file()
}
pub fn recover_logical_log(&self, io: &Arc<dyn crate::IO>, pager: &Arc<Pager>) -> Result<()> {
// Get lock, if we don't get it we will wait until recover finishes in another connection
// and then return.
let _recover_guard = self.recover_lock.write();
if !self.storage.needs_recover() {
// another connection completed recover
return Ok(());
}
let file = self.get_logical_log_file();
let mut reader = StreamingLogicalLogReader::new(file.clone());
let c = reader.read_header()?;
io.wait_for_completion(c)?;
let tx_id = 0;
self.begin_load_tx(pager.clone())?;
loop {
match reader.next_record(io).unwrap() {
StreamingResult::InsertRow { row, rowid } => {
tracing::trace!("read {rowid:?}");
self.insert(tx_id, row)?;
}
StreamingResult::DeleteRow { rowid } => {
self.delete(tx_id, rowid)?;
}
StreamingResult::Eof => {
break;
}
}
}
self.commit_load_tx(tx_id);
self.mark_recovered();
Ok(())
}
}
/// A write-write conflict happens when transaction T_current attempts to update a

View File

@@ -2,22 +2,20 @@
#![allow(dead_code)]
use crate::{
io::ReadComplete,
mvcc::{
database::{LogRecord, Row, RowID, RowVersion},
LocalClock, MvStore,
},
mvcc::database::{LogRecord, Row, RowID, RowVersion},
storage::sqlite3_ondisk::{read_varint, write_varint_to_vec},
turso_assert,
types::{IOCompletions, ImmutableRecord},
Buffer, Completion, CompletionError, LimboError, Pager, Result,
Buffer, Completion, CompletionError, LimboError, Result,
};
use std::{cell::RefCell, sync::Arc};
use crate::{types::IOResult, File};
pub struct LogicalLog {
file: Arc<dyn File>,
pub file: Arc<dyn File>,
offset: u64,
recover: bool,
}
/// Log's Header, this will be the 64 bytes in any logical log file.
@@ -138,7 +136,12 @@ impl LogRecordType {
impl LogicalLog {
pub fn new(file: Arc<dyn File>) -> Self {
Self { file, offset: 0 }
let recover = file.size().unwrap() > 0;
Self {
file,
offset: 0,
recover,
}
}
pub fn log_tx(&mut self, tx: &LogRecord) -> Result<IOResult<()>> {
@@ -213,9 +216,17 @@ impl LogicalLog {
self.offset = 0;
Ok(IOResult::IO(IOCompletions::Single(c)))
}
pub fn needs_recover(&self) -> bool {
self.recover
}
pub fn mark_recovered(&mut self) {
self.recover = false;
}
}
enum StreamingResult {
pub enum StreamingResult {
InsertRow { row: Row, rowid: RowID },
DeleteRow { rowid: RowID },
Eof,
@@ -230,7 +241,7 @@ enum StreamingState {
},
}
struct StreamingLogicalLogReader {
pub struct StreamingLogicalLogReader {
file: Arc<dyn File>,
/// Offset to read from file
offset: usize,
@@ -436,38 +447,6 @@ impl StreamingLogicalLogReader {
}
}
pub fn load_logical_log(
mv_store: &Arc<MvStore<LocalClock>>,
file: Arc<dyn File>,
io: &Arc<dyn crate::IO>,
pager: &Arc<Pager>,
) -> Result<LogicalLog> {
let mut reader = StreamingLogicalLogReader::new(file.clone());
let c = reader.read_header()?;
io.wait_for_completion(c)?;
let tx_id = 0;
mv_store.begin_load_tx(pager.clone())?;
loop {
match reader.next_record(io).unwrap() {
StreamingResult::InsertRow { row, rowid } => {
tracing::trace!("read {rowid:?}");
mv_store.insert(tx_id, row)?;
}
StreamingResult::DeleteRow { rowid } => {
mv_store.delete(tx_id, rowid)?;
}
StreamingResult::Eof => {
break;
}
}
}
mv_store.commit_load_tx(tx_id);
let logical_log = LogicalLog::new(file);
Ok(logical_log)
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc};
@@ -491,7 +470,7 @@ mod tests {
OpenFlags, RefValue,
};
use super::{load_logical_log, LogRecordType};
use super::LogRecordType;
#[test]
fn test_logical_log_read() {
@@ -517,7 +496,7 @@ mod tests {
let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap();
let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone())));
load_logical_log(&mvcc_store, file, &io, &pager).unwrap();
mvcc_store.recover_logical_log(&io, &pager).unwrap();
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();
let row = mvcc_store.read(tx, RowID::new(1, 1)).unwrap().unwrap();
let record = ImmutableRecord::from_bin_record(row.data.clone());
@@ -559,7 +538,7 @@ mod tests {
let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap();
let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone())));
load_logical_log(&mvcc_store, file, &io, &pager).unwrap();
mvcc_store.recover_logical_log(&io, &pager).unwrap();
for (rowid, value) in &values {
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();
let row = mvcc_store.read(tx, *rowid).unwrap().unwrap();
@@ -650,7 +629,7 @@ mod tests {
let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap();
let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone())));
load_logical_log(&mvcc_store, file, &io, &pager).unwrap();
mvcc_store.recover_logical_log(&io, &pager).unwrap();
// Check rowids that weren't deleted
let tx = mvcc_store.begin_tx(pager.clone()).unwrap();

View File

@@ -1,14 +1,14 @@
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
mod logical_log;
pub mod logical_log;
use crate::mvcc::database::LogRecord;
use crate::mvcc::persistent_storage::logical_log::LogicalLog;
use crate::types::IOResult;
use crate::{File, Result};
pub struct Storage {
logical_log: RwLock<LogicalLog>,
pub logical_log: RwLock<LogicalLog>,
}
impl Storage {
@@ -35,6 +35,18 @@ impl Storage {
pub fn truncate(&self) -> Result<IOResult<()>> {
self.logical_log.write().unwrap().truncate()
}
pub fn needs_recover(&self) -> bool {
self.logical_log.read().unwrap().needs_recover()
}
pub fn mark_recovered(&self) {
self.logical_log.write().unwrap().mark_recovered();
}
pub fn get_logical_log_file(&self) -> Arc<dyn File> {
self.logical_log.write().unwrap().file.clone()
}
}
impl Debug for Storage {