diff --git a/core/mvcc/database/checkpoint_state_machine.rs b/core/mvcc/database/checkpoint_state_machine.rs index 23fc97eaf..d2f5e5897 100644 --- a/core/mvcc/database/checkpoint_state_machine.rs +++ b/core/mvcc/database/checkpoint_state_machine.rs @@ -7,9 +7,10 @@ use crate::state_machine::{StateMachine, StateTransition, TransitionResult}; use crate::storage::btree::BTreeCursor; use crate::storage::pager::CreateBTreeFlags; use crate::storage::wal::{CheckpointMode, TursoRwLock}; -use crate::types::{IOResult, ImmutableRecord, RecordCursor}; +use crate::types::{IOCompletions, IOResult, ImmutableRecord, RecordCursor}; use crate::{ - CheckpointResult, Connection, IOExt, Pager, RefValue, Result, TransactionState, Value, + CheckpointResult, Completion, Connection, IOExt, Pager, RefValue, Result, TransactionState, + Value, }; use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; @@ -271,12 +272,12 @@ impl CheckpointStateMachine { } /// Fsync the logical log file - fn fsync_logical_log(&self) -> Result> { + fn fsync_logical_log(&self) -> Result { self.mvstore.storage.sync() } /// Truncate the logical log file - fn truncate_logical_log(&self) -> Result> { + fn truncate_logical_log(&self) -> Result { self.mvstore.storage.truncate() } @@ -572,30 +573,25 @@ impl CheckpointStateMachine { CheckpointState::TruncateLogicalLog => { tracing::debug!("Truncating logical log file"); - match self.truncate_logical_log()? { - IOResult::Done(_) => { - self.state = CheckpointState::FsyncLogicalLog; - Ok(TransitionResult::Continue) - } - IOResult::IO(io) => { - if io.finished() { - self.state = CheckpointState::CheckpointWal; - Ok(TransitionResult::Continue) - } else { - Ok(TransitionResult::Io(io)) - } - } + let c = self.truncate_logical_log()?; + self.state = CheckpointState::FsyncLogicalLog; + // if Completion Completed without errors we can continue + if c.is_completed() { + Ok(TransitionResult::Continue) + } else { + Ok(TransitionResult::Io(IOCompletions::Single(c))) } } CheckpointState::FsyncLogicalLog => { tracing::debug!("Fsyncing logical log file"); - match self.fsync_logical_log()? { - IOResult::Done(_) => { - self.state = CheckpointState::CheckpointWal; - Ok(TransitionResult::Continue) - } - IOResult::IO(io) => Ok(TransitionResult::Io(io)), + let c = self.fsync_logical_log()?; + self.state = CheckpointState::CheckpointWal; + // if Completion Completed without errors we can continue + if c.is_completed() { + Ok(TransitionResult::Continue) + } else { + Ok(TransitionResult::Io(IOCompletions::Single(c))) } } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 486cfb923..e9794289d 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -611,27 +611,24 @@ impl StateTransition for CommitStateMachine { ))); } } - let result = mvcc_store.storage.log_tx(log_record)?; + let c = mvcc_store.storage.log_tx(log_record)?; self.state = CommitState::SyncLogicalLog { end_ts: *end_ts }; - match result { - IOResult::Done(_) => {} - IOResult::IO(io) => { - if !io.finished() { - return Ok(TransitionResult::Io(io)); - } - } + // if Completion Completed without errors we can continue + if c.is_completed() { + Ok(TransitionResult::Continue) + } else { + Ok(TransitionResult::Io(IOCompletions::Single(c))) } - return Ok(TransitionResult::Continue); } CommitState::SyncLogicalLog { end_ts } => { - let result = mvcc_store.storage.sync()?; + let c = mvcc_store.storage.sync()?; self.state = CommitState::EndCommitLogicalLog { end_ts: *end_ts }; - if let IOResult::IO(io) = result { - if !io.finished() { - return Ok(TransitionResult::Io(io)); - } + // if Completion Completed without errors we can continue + if c.is_completed() { + Ok(TransitionResult::Continue) + } else { + Ok(TransitionResult::Io(IOCompletions::Single(c))) } - return Ok(TransitionResult::Continue); } CommitState::EndCommitLogicalLog { end_ts } => { let connection = self.connection.clone(); diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index bb522796e..591dd866f 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -5,12 +5,12 @@ use crate::{ mvcc::database::{LogRecord, MVTableId, Row, RowID, RowVersion}, storage::sqlite3_ondisk::{read_varint, write_varint_to_vec}, turso_assert, - types::{IOCompletions, ImmutableRecord}, + types::ImmutableRecord, Buffer, Completion, CompletionError, LimboError, Result, }; use std::sync::{Arc, RwLock}; -use crate::{types::IOResult, File}; +use crate::File; pub struct LogicalLog { pub file: Arc, @@ -143,7 +143,7 @@ impl LogicalLog { Self { file, offset: 0 } } - pub fn log_tx(&mut self, tx: &LogRecord) -> Result> { + pub fn log_tx(&mut self, tx: &LogRecord) -> Result { let mut buffer = Vec::new(); // 1. Serialize log header if it's first write @@ -194,18 +194,18 @@ impl LogicalLog { let buffer_len = buffer.len(); let c = self.file.pwrite(self.offset, buffer, c)?; self.offset += buffer_len as u64; - Ok(IOResult::IO(IOCompletions::Single(c))) + Ok(c) } - pub fn sync(&mut self) -> Result> { + pub fn sync(&mut self) -> Result { let completion = Completion::new_sync(move |_| { tracing::debug!("logical_log_sync finish"); }); let c = self.file.sync(completion)?; - Ok(IOResult::IO(IOCompletions::Single(c))) + Ok(c) } - pub fn truncate(&mut self) -> Result> { + pub fn truncate(&mut self) -> Result { let completion = Completion::new_trunc(move |result| { if let Err(err) = result { tracing::error!("logical_log_truncate failed: {}", err); @@ -213,7 +213,7 @@ impl LogicalLog { }); let c = self.file.truncate(0, completion)?; self.offset = 0; - Ok(IOResult::IO(IOCompletions::Single(c))) + Ok(c) } } diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index 063eab90a..cafcb3230 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -4,8 +4,7 @@ use std::sync::{Arc, RwLock}; pub mod logical_log; use crate::mvcc::database::LogRecord; use crate::mvcc::persistent_storage::logical_log::LogicalLog; -use crate::types::IOResult; -use crate::{File, Result}; +use crate::{Completion, File, Result}; pub struct Storage { pub logical_log: RwLock, @@ -20,7 +19,7 @@ impl Storage { } impl Storage { - pub fn log_tx(&self, m: &LogRecord) -> Result> { + pub fn log_tx(&self, m: &LogRecord) -> Result { self.logical_log.write().unwrap().log_tx(m) } @@ -28,11 +27,11 @@ impl Storage { todo!() } - pub fn sync(&self) -> Result> { + pub fn sync(&self) -> Result { self.logical_log.write().unwrap().sync() } - pub fn truncate(&self) -> Result> { + pub fn truncate(&self) -> Result { self.logical_log.write().unwrap().truncate() }