adjust logical log IO functions to return Completions and not IOResult

This commit is contained in:
pedrocarlo
2025-10-03 01:33:20 -03:00
parent b11246278f
commit 131a5b8048
4 changed files with 43 additions and 51 deletions

View File

@@ -7,9 +7,10 @@ use crate::state_machine::{StateMachine, StateTransition, TransitionResult};
use crate::storage::btree::BTreeCursor;
use crate::storage::pager::CreateBTreeFlags;
use crate::storage::wal::{CheckpointMode, TursoRwLock};
use crate::types::{IOResult, ImmutableRecord, RecordCursor};
use crate::types::{IOCompletions, IOResult, ImmutableRecord, RecordCursor};
use crate::{
CheckpointResult, Connection, IOExt, Pager, RefValue, Result, TransactionState, Value,
CheckpointResult, Completion, Connection, IOExt, Pager, RefValue, Result, TransactionState,
Value,
};
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
@@ -271,12 +272,12 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
}
/// Fsync the logical log file
fn fsync_logical_log(&self) -> Result<IOResult<()>> {
fn fsync_logical_log(&self) -> Result<Completion> {
self.mvstore.storage.sync()
}
/// Truncate the logical log file
fn truncate_logical_log(&self) -> Result<IOResult<()>> {
fn truncate_logical_log(&self) -> Result<Completion> {
self.mvstore.storage.truncate()
}
@@ -572,30 +573,25 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
CheckpointState::TruncateLogicalLog => {
tracing::debug!("Truncating logical log file");
match self.truncate_logical_log()? {
IOResult::Done(_) => {
self.state = CheckpointState::FsyncLogicalLog;
Ok(TransitionResult::Continue)
}
IOResult::IO(io) => {
if io.finished() {
self.state = CheckpointState::CheckpointWal;
Ok(TransitionResult::Continue)
} else {
Ok(TransitionResult::Io(io))
}
}
let c = self.truncate_logical_log()?;
self.state = CheckpointState::FsyncLogicalLog;
// if Completion Completed without errors we can continue
if c.is_completed() {
Ok(TransitionResult::Continue)
} else {
Ok(TransitionResult::Io(IOCompletions::Single(c)))
}
}
CheckpointState::FsyncLogicalLog => {
tracing::debug!("Fsyncing logical log file");
match self.fsync_logical_log()? {
IOResult::Done(_) => {
self.state = CheckpointState::CheckpointWal;
Ok(TransitionResult::Continue)
}
IOResult::IO(io) => Ok(TransitionResult::Io(io)),
let c = self.fsync_logical_log()?;
self.state = CheckpointState::CheckpointWal;
// if Completion Completed without errors we can continue
if c.is_completed() {
Ok(TransitionResult::Continue)
} else {
Ok(TransitionResult::Io(IOCompletions::Single(c)))
}
}

View File

@@ -611,27 +611,24 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
)));
}
}
let result = mvcc_store.storage.log_tx(log_record)?;
let c = mvcc_store.storage.log_tx(log_record)?;
self.state = CommitState::SyncLogicalLog { end_ts: *end_ts };
match result {
IOResult::Done(_) => {}
IOResult::IO(io) => {
if !io.finished() {
return Ok(TransitionResult::Io(io));
}
}
// if Completion Completed without errors we can continue
if c.is_completed() {
Ok(TransitionResult::Continue)
} else {
Ok(TransitionResult::Io(IOCompletions::Single(c)))
}
return Ok(TransitionResult::Continue);
}
CommitState::SyncLogicalLog { end_ts } => {
let result = mvcc_store.storage.sync()?;
let c = mvcc_store.storage.sync()?;
self.state = CommitState::EndCommitLogicalLog { end_ts: *end_ts };
if let IOResult::IO(io) = result {
if !io.finished() {
return Ok(TransitionResult::Io(io));
}
// if Completion Completed without errors we can continue
if c.is_completed() {
Ok(TransitionResult::Continue)
} else {
Ok(TransitionResult::Io(IOCompletions::Single(c)))
}
return Ok(TransitionResult::Continue);
}
CommitState::EndCommitLogicalLog { end_ts } => {
let connection = self.connection.clone();

View File

@@ -5,12 +5,12 @@ use crate::{
mvcc::database::{LogRecord, MVTableId, Row, RowID, RowVersion},
storage::sqlite3_ondisk::{read_varint, write_varint_to_vec},
turso_assert,
types::{IOCompletions, ImmutableRecord},
types::ImmutableRecord,
Buffer, Completion, CompletionError, LimboError, Result,
};
use std::sync::{Arc, RwLock};
use crate::{types::IOResult, File};
use crate::File;
pub struct LogicalLog {
pub file: Arc<dyn File>,
@@ -143,7 +143,7 @@ impl LogicalLog {
Self { file, offset: 0 }
}
pub fn log_tx(&mut self, tx: &LogRecord) -> Result<IOResult<()>> {
pub fn log_tx(&mut self, tx: &LogRecord) -> Result<Completion> {
let mut buffer = Vec::new();
// 1. Serialize log header if it's first write
@@ -194,18 +194,18 @@ impl LogicalLog {
let buffer_len = buffer.len();
let c = self.file.pwrite(self.offset, buffer, c)?;
self.offset += buffer_len as u64;
Ok(IOResult::IO(IOCompletions::Single(c)))
Ok(c)
}
pub fn sync(&mut self) -> Result<IOResult<()>> {
pub fn sync(&mut self) -> Result<Completion> {
let completion = Completion::new_sync(move |_| {
tracing::debug!("logical_log_sync finish");
});
let c = self.file.sync(completion)?;
Ok(IOResult::IO(IOCompletions::Single(c)))
Ok(c)
}
pub fn truncate(&mut self) -> Result<IOResult<()>> {
pub fn truncate(&mut self) -> Result<Completion> {
let completion = Completion::new_trunc(move |result| {
if let Err(err) = result {
tracing::error!("logical_log_truncate failed: {}", err);
@@ -213,7 +213,7 @@ impl LogicalLog {
});
let c = self.file.truncate(0, completion)?;
self.offset = 0;
Ok(IOResult::IO(IOCompletions::Single(c)))
Ok(c)
}
}

View File

@@ -4,8 +4,7 @@ use std::sync::{Arc, RwLock};
pub mod logical_log;
use crate::mvcc::database::LogRecord;
use crate::mvcc::persistent_storage::logical_log::LogicalLog;
use crate::types::IOResult;
use crate::{File, Result};
use crate::{Completion, File, Result};
pub struct Storage {
pub logical_log: RwLock<LogicalLog>,
@@ -20,7 +19,7 @@ impl Storage {
}
impl Storage {
pub fn log_tx(&self, m: &LogRecord) -> Result<IOResult<()>> {
pub fn log_tx(&self, m: &LogRecord) -> Result<Completion> {
self.logical_log.write().unwrap().log_tx(m)
}
@@ -28,11 +27,11 @@ impl Storage {
todo!()
}
pub fn sync(&self) -> Result<IOResult<()>> {
pub fn sync(&self) -> Result<Completion> {
self.logical_log.write().unwrap().sync()
}
pub fn truncate(&self) -> Result<IOResult<()>> {
pub fn truncate(&self) -> Result<Completion> {
self.logical_log.write().unwrap().truncate()
}