thread safely store the result of completion

This commit is contained in:
pedrocarlo
2025-08-15 00:14:49 -03:00
parent de1811dea7
commit 66171527b4

View File

@@ -7,7 +7,7 @@ use std::cell::RefCell;
use std::fmt;
use std::ptr::NonNull;
use std::sync::{Arc, OnceLock};
use std::{cell::Cell, fmt::Debug, pin::Pin};
use std::{fmt::Debug, pin::Pin};
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
@@ -87,11 +87,10 @@ pub trait IO: Clock + Send + Sync {
fn run_once(&self) -> Result<()>;
fn wait_for_completion(&self, c: Completion) -> Result<()> {
while !c.has_error() && !c.is_completed() {
while !c.finished() {
self.run_once()?
}
if c.has_error() {
let err = c.inner.error.get().cloned().unwrap();
if let Some(Some(err)) = c.inner.result.get().copied() {
return Err(err.into());
}
Ok(())
@@ -128,8 +127,9 @@ pub struct Completion {
#[derive(Debug)]
struct CompletionInner {
completion_type: CompletionType,
is_completed: Cell<bool>,
error: std::sync::OnceLock<CompletionError>,
/// None means we completed successfully
// Thread safe with OnceLock
result: std::sync::OnceLock<Option<CompletionError>>,
}
impl Debug for CompletionType {
@@ -155,8 +155,7 @@ impl Completion {
Self {
inner: Arc::new(CompletionInner {
completion_type,
is_completed: Cell::new(false),
error: OnceLock::new(),
result: OnceLock::new(),
}),
}
}
@@ -205,15 +204,24 @@ impl Completion {
}
pub fn is_completed(&self) -> bool {
self.inner.is_completed.get()
self.inner.result.get().is_some_and(|val| val.is_none())
}
pub fn has_error(&self) -> bool {
self.inner.error.get().is_some()
self.inner.result.get().is_some_and(|val| val.is_some())
}
/// Checks if the Completion completed or errored
pub fn finished(&self) -> bool {
self.inner.result.get().is_some()
}
pub fn complete(&self, result: i32) {
if !self.has_error() && !self.inner.is_completed.get() {
turso_assert!(
!self.finished(),
"should not complete a finished Completion"
);
if self.inner.result.set(None).is_ok() {
let result = Ok(result);
match &self.inner.completion_type {
CompletionType::Read(r) => r.callback(result),
@@ -221,16 +229,12 @@ impl Completion {
CompletionType::Sync(s) => s.callback(result), // fix
CompletionType::Truncate(t) => t.callback(result),
};
self.inner.is_completed.set(true);
}
}
pub fn error(&self, err: CompletionError) {
turso_assert!(
!self.is_completed(),
"should not error a completed Completion"
);
if !self.has_error() {
turso_assert!(!self.finished(), "should not error a finished Completion");
if self.inner.result.set(Some(err)).is_ok() {
let result = Err(err);
match &self.inner.completion_type {
CompletionType::Read(r) => r.callback(result),
@@ -238,7 +242,6 @@ impl Completion {
CompletionType::Sync(s) => s.callback(result), // fix
CompletionType::Truncate(t) => t.callback(result),
};
self.inner.error.get_or_init(|| err);
}
}