diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 06c935bc4..0383182de 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -14,6 +14,7 @@ use crate::types::IOResult; use crate::types::ImmutableRecord; use crate::types::SeekResult; use crate::Completion; +use crate::File; use crate::IOExt; use crate::LimboError; use crate::Result; @@ -32,6 +33,9 @@ use tracing::Level; pub mod checkpoint_state_machine; pub use checkpoint_state_machine::{CheckpointState, CheckpointStateMachine}; +use super::persistent_storage::logical_log::StreamingLogicalLogReader; +use super::persistent_storage::logical_log::StreamingResult; + #[cfg(test)] pub mod tests; @@ -855,6 +859,10 @@ pub struct MvStore { /// If there are two concurrent BEGIN (non-CONCURRENT) transactions, and one tries to promote /// to exclusive, it will abort if another transaction committed after its begin timestamp. last_committed_tx_ts: AtomicU64, + + /// Lock used while recovering a logical log file. We don't want multiple connections trying to + /// load the file. + recover_lock: RwLock<()>, } impl MvStore { @@ -878,6 +886,7 @@ impl MvStore { checkpointed_txid_max: AtomicU64::new(0), last_committed_schema_change_ts: AtomicU64::new(0), last_committed_tx_ts: AtomicU64::new(0), + recover_lock: RwLock::new(()), } } @@ -1739,6 +1748,52 @@ impl MvStore { .unwrap_or(None); last_rowid } + + pub fn needs_recover(&self) -> bool { + self.storage.needs_recover() + } + + pub fn mark_recovered(&self) { + self.storage.mark_recovered(); + } + + pub fn get_logical_log_file(&self) -> Arc { + self.storage.get_logical_log_file() + } + + pub fn recover_logical_log(&self, io: &Arc, pager: &Arc) -> Result<()> { + // Get lock, if we don't get it we will wait until recover finishes in another connection + // and then return. + let _recover_guard = self.recover_lock.write(); + if !self.storage.needs_recover() { + // another connection completed recover + return Ok(()); + } + let file = self.get_logical_log_file(); + let mut reader = StreamingLogicalLogReader::new(file.clone()); + + let c = reader.read_header()?; + io.wait_for_completion(c)?; + let tx_id = 0; + self.begin_load_tx(pager.clone())?; + loop { + match reader.next_record(io).unwrap() { + StreamingResult::InsertRow { row, rowid } => { + tracing::trace!("read {rowid:?}"); + self.insert(tx_id, row)?; + } + StreamingResult::DeleteRow { rowid } => { + self.delete(tx_id, rowid)?; + } + StreamingResult::Eof => { + break; + } + } + } + self.commit_load_tx(tx_id); + self.mark_recovered(); + Ok(()) + } } /// A write-write conflict happens when transaction T_current attempts to update a diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index b64cb5acc..987477aa5 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -2,22 +2,20 @@ #![allow(dead_code)] use crate::{ io::ReadComplete, - mvcc::{ - database::{LogRecord, Row, RowID, RowVersion}, - LocalClock, MvStore, - }, + mvcc::database::{LogRecord, Row, RowID, RowVersion}, storage::sqlite3_ondisk::{read_varint, write_varint_to_vec}, turso_assert, types::{IOCompletions, ImmutableRecord}, - Buffer, Completion, CompletionError, LimboError, Pager, Result, + Buffer, Completion, CompletionError, LimboError, Result, }; use std::{cell::RefCell, sync::Arc}; use crate::{types::IOResult, File}; pub struct LogicalLog { - file: Arc, + pub file: Arc, offset: u64, + recover: bool, } /// Log's Header, this will be the 64 bytes in any logical log file. @@ -138,7 +136,12 @@ impl LogRecordType { impl LogicalLog { pub fn new(file: Arc) -> Self { - Self { file, offset: 0 } + let recover = file.size().unwrap() > 0; + Self { + file, + offset: 0, + recover, + } } pub fn log_tx(&mut self, tx: &LogRecord) -> Result> { @@ -213,9 +216,17 @@ impl LogicalLog { self.offset = 0; Ok(IOResult::IO(IOCompletions::Single(c))) } + + pub fn needs_recover(&self) -> bool { + self.recover + } + + pub fn mark_recovered(&mut self) { + self.recover = false; + } } -enum StreamingResult { +pub enum StreamingResult { InsertRow { row: Row, rowid: RowID }, DeleteRow { rowid: RowID }, Eof, @@ -230,7 +241,7 @@ enum StreamingState { }, } -struct StreamingLogicalLogReader { +pub struct StreamingLogicalLogReader { file: Arc, /// Offset to read from file offset: usize, @@ -436,38 +447,6 @@ impl StreamingLogicalLogReader { } } -pub fn load_logical_log( - mv_store: &Arc>, - file: Arc, - io: &Arc, - pager: &Arc, -) -> Result { - let mut reader = StreamingLogicalLogReader::new(file.clone()); - - let c = reader.read_header()?; - io.wait_for_completion(c)?; - let tx_id = 0; - mv_store.begin_load_tx(pager.clone())?; - loop { - match reader.next_record(io).unwrap() { - StreamingResult::InsertRow { row, rowid } => { - tracing::trace!("read {rowid:?}"); - mv_store.insert(tx_id, row)?; - } - StreamingResult::DeleteRow { rowid } => { - mv_store.delete(tx_id, rowid)?; - } - StreamingResult::Eof => { - break; - } - } - } - mv_store.commit_load_tx(tx_id); - - let logical_log = LogicalLog::new(file); - Ok(logical_log) -} - #[cfg(test)] mod tests { use std::{collections::HashSet, sync::Arc}; @@ -491,7 +470,7 @@ mod tests { OpenFlags, RefValue, }; - use super::{load_logical_log, LogRecordType}; + use super::LogRecordType; #[test] fn test_logical_log_read() { @@ -517,7 +496,7 @@ mod tests { let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap(); let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone()))); - load_logical_log(&mvcc_store, file, &io, &pager).unwrap(); + mvcc_store.recover_logical_log(&io, &pager).unwrap(); let tx = mvcc_store.begin_tx(pager.clone()).unwrap(); let row = mvcc_store.read(tx, RowID::new(1, 1)).unwrap().unwrap(); let record = ImmutableRecord::from_bin_record(row.data.clone()); @@ -559,7 +538,7 @@ mod tests { let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap(); let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone()))); - load_logical_log(&mvcc_store, file, &io, &pager).unwrap(); + mvcc_store.recover_logical_log(&io, &pager).unwrap(); for (rowid, value) in &values { let tx = mvcc_store.begin_tx(pager.clone()).unwrap(); let row = mvcc_store.read(tx, *rowid).unwrap().unwrap(); @@ -650,7 +629,7 @@ mod tests { let file = io.open_file(log_file, OpenFlags::ReadOnly, false).unwrap(); let mvcc_store = Arc::new(MvStore::new(LocalClock::new(), Storage::new(file.clone()))); - load_logical_log(&mvcc_store, file, &io, &pager).unwrap(); + mvcc_store.recover_logical_log(&io, &pager).unwrap(); // Check rowids that weren't deleted let tx = mvcc_store.begin_tx(pager.clone()).unwrap(); diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index 58af1b849..0d5d2967f 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -1,14 +1,14 @@ use std::fmt::Debug; use std::sync::{Arc, RwLock}; -mod logical_log; +pub mod logical_log; use crate::mvcc::database::LogRecord; use crate::mvcc::persistent_storage::logical_log::LogicalLog; use crate::types::IOResult; use crate::{File, Result}; pub struct Storage { - logical_log: RwLock, + pub logical_log: RwLock, } impl Storage { @@ -35,6 +35,18 @@ impl Storage { pub fn truncate(&self) -> Result> { self.logical_log.write().unwrap().truncate() } + + pub fn needs_recover(&self) -> bool { + self.logical_log.read().unwrap().needs_recover() + } + + pub fn mark_recovered(&self) { + self.logical_log.write().unwrap().mark_recovered(); + } + + pub fn get_logical_log_file(&self) -> Arc { + self.logical_log.write().unwrap().file.clone() + } } impl Debug for Storage {