diff --git a/core/mvcc/bindings/c/Cargo.toml b/core/mvcc/bindings/c/Cargo.toml index 4d8bb1427..04aa540b8 100644 --- a/core/mvcc/bindings/c/Cargo.toml +++ b/core/mvcc/bindings/c/Cargo.toml @@ -16,3 +16,8 @@ mvcc-rs = { path = "../../mvcc-rs" } tokio = { version = "1.27.0", features = ["full", "parking_lot"] } tracing = "0.1.37" tracing-subscriber = { version = "0" } + +[features] +default = [] +json_on_disk_storage = [] +s3_storage = [] diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index 9a64ddee5..8cb28a477 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -5,6 +5,7 @@ mod errors; mod types; use errors::MVCCError; +use mvcc_rs::persistent_storage::{s3, Storage}; use mvcc_rs::*; use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext}; @@ -19,6 +20,21 @@ type ScanCursor = cursor::ScanCursor<'static, Clock>; static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new(); +async fn storage_for(main_db_path: &str) -> database::Result { + // TODO: let's accept an URL instead of main_db_path here, so we can + // pass custom S3 endpoints, options, etc. + if cfg!(feature = "json_on_disk_storage") { + tracing::info!("JSONonDisk storage stored in {main_db_path}-mvcc"); + return Ok(Storage::new_json_on_disk(format!("{main_db_path}-mvcc"))); + } + if cfg!(feature = "s3_storage") { + tracing::info!("S3 storage for {main_db_path}"); + return Storage::new_s3(s3::Options::with_create_bucket_if_not_exists(true)).await; + } + tracing::info!("No persistent storage for {main_db_path}"); + Ok(Storage::new_noop()) +} + #[no_mangle] pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCCDatabaseRef { INIT_RUST_LOG.call_once(|| { @@ -28,18 +44,28 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC tracing::debug!("MVCCDatabaseOpen"); let clock = clock::LocalClock::new(); - let path = unsafe { std::ffi::CStr::from_ptr(path) }; - let path = match path.to_str() { + let main_db_path = unsafe { std::ffi::CStr::from_ptr(path) }; + let main_db_path = match main_db_path.to_str() { Ok(path) => path, Err(_) => { tracing::error!("Invalid UTF-8 path"); return MVCCDatabaseRef::null(); } }; - tracing::debug!("mvccrs: opening persistent storage at {path}"); - let storage = crate::persistent_storage::Storage::new_json_on_disk(path); - let db = Db::new(clock, storage); let runtime = tokio::runtime::Runtime::new().unwrap(); + + tracing::debug!("mvccrs: opening persistent storage for {main_db_path}"); + let storage = match runtime.block_on(storage_for(main_db_path)) { + Ok(storage) => storage, + Err(e) => { + tracing::error!("Failed to open persistent storage: {e}"); + return MVCCDatabaseRef::null(); + } + }; + let db = Db::new(clock, storage); + + runtime.block_on(db.recover()).ok(); + let ctx = DbContext { db, runtime }; let ctx = Box::leak(Box::new(ctx)); MVCCDatabaseRef::from(ctx) diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml index e5c6ea399..40cbd2a7c 100644 --- a/core/mvcc/mvcc-rs/Cargo.toml +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -15,6 +15,9 @@ serde_json = "1.0.96" pin-project = "1.0.12" tracing-subscriber = { version = "0", optional = true } base64 = "0.21.0" +aws-sdk-s3 = "0.27.0" +aws-config = "0.55.2" +tokio-util = "0.7.8" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index 1c885be6e..84425f1af 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -36,7 +36,7 @@ pub type TxID = u64; /// A log record contains all the versions inserted and deleted by a transaction. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LogRecord { - tx_timestamp: TxID, + pub(crate) tx_timestamp: TxID, row_versions: Vec, } @@ -530,15 +530,9 @@ impl DatabaseInner { } pub async fn recover(&self) -> Result<()> { - use futures::StreamExt; - let tx_log = self - .storage - .read_tx_log() - .await? - .collect::>() - .await; + let tx_log = self.storage.read_tx_log().await?; for record in tx_log { - println!("RECOVERING {:?}", record); + tracing::debug!("RECOVERING {:?}", record); for version in record.row_versions { let mut rows = self.rows.borrow_mut(); let row_versions = rows.entry(version.row.id).or_insert_with(Vec::new); diff --git a/core/mvcc/mvcc-rs/src/persistent_storage.rs b/core/mvcc/mvcc-rs/src/persistent_storage.rs deleted file mode 100644 index f07379981..000000000 --- a/core/mvcc/mvcc-rs/src/persistent_storage.rs +++ /dev/null @@ -1,81 +0,0 @@ -use crate::database::{LogRecord, Result}; -use crate::errors::DatabaseError; - -#[derive(Debug)] -pub enum Storage { - Noop, - JsonOnDisk(std::path::PathBuf), -} - -impl Storage { - pub fn new_noop() -> Self { - Self::Noop - } - - pub fn new_json_on_disk(path: impl Into) -> Self { - let path = path.into(); - Self::JsonOnDisk(path) - } -} - -#[pin_project::pin_project] -pub struct JsonOnDiskStream { - #[pin] - inner: tokio_stream::wrappers::LinesStream>, -} - -impl futures::stream::Stream for JsonOnDiskStream { - type Item = LogRecord; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let this = self.project(); - this.inner - .poll_next(cx) - .map(|x| x.and_then(|x| x.ok().and_then(|x| serde_json::from_str(x.as_str()).ok()))) - } -} - -impl Storage { - pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> { - if let Self::JsonOnDisk(path) = self { - use tokio::io::AsyncWriteExt; - let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; - let mut file = tokio::fs::OpenOptions::new() - .create(true) - .append(true) - .open(&path) - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - file.write_all(&t) - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - file.write_all(b"\n") - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - } - Ok(()) - } - - pub async fn read_tx_log(&self) -> Result { - if let Self::JsonOnDisk(path) = self { - use tokio::io::AsyncBufReadExt; - let file = tokio::fs::OpenOptions::new() - .read(true) - .open(&path) - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - Ok(JsonOnDiskStream { - inner: tokio_stream::wrappers::LinesStream::new( - tokio::io::BufReader::new(file).lines(), - ), - }) - } else { - Err(crate::errors::DatabaseError::Io( - "cannot read from Noop storage".to_string(), - )) - } - } -} diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs new file mode 100644 index 000000000..1dd72c02a --- /dev/null +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -0,0 +1,81 @@ +use crate::database::{LogRecord, Result}; +use crate::errors::DatabaseError; + +pub mod s3; + +#[derive(Debug)] +pub enum Storage { + Noop, + JsonOnDisk(std::path::PathBuf), + S3(s3::Replicator), +} + +impl Storage { + pub fn new_noop() -> Self { + Self::Noop + } + + pub fn new_json_on_disk(path: impl Into) -> Self { + let path = path.into(); + Self::JsonOnDisk(path) + } + + pub async fn new_s3(options: s3::Options) -> Result { + Ok(Self::S3(s3::Replicator::new(options).await?)) + } +} + +impl Storage { + pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> { + match self { + Self::JsonOnDisk(path) => { + use tokio::io::AsyncWriteExt; + let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(&t) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(b"\n") + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + } + Self::S3(replicator) => { + replicator.replicate_tx(m).await?; + } + Self::Noop => (), + } + Ok(()) + } + + pub async fn read_tx_log(&self) -> Result> { + match self { + Self::JsonOnDisk(path) => { + use tokio::io::AsyncBufReadExt; + let file = tokio::fs::OpenOptions::new() + .read(true) + .open(&path) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + + let mut records: Vec = Vec::new(); + let mut lines = tokio::io::BufReader::new(file).lines(); + while let Ok(Some(line)) = lines.next_line().await { + records.push( + serde_json::from_str(&line) + .map_err(|e| DatabaseError::Io(e.to_string()))?, + ) + } + Ok(records) + } + Self::S3(replicator) => replicator.read_tx_log().await, + Self::Noop => Err(crate::errors::DatabaseError::Io( + "cannot read from Noop storage".to_string(), + )), + } + } +} diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs new file mode 100644 index 000000000..836c35363 --- /dev/null +++ b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs @@ -0,0 +1,136 @@ +use crate::database::{LogRecord, Result}; +use crate::errors::DatabaseError; +use aws_sdk_s3::Client; + +#[derive(Clone, Copy, Debug)] +#[non_exhaustive] +pub struct Options { + pub create_bucket_if_not_exists: bool, +} + +impl Options { + pub fn with_create_bucket_if_not_exists(create_bucket_if_not_exists: bool) -> Self { + Self { + create_bucket_if_not_exists, + } + } +} + +#[derive(Debug)] +pub struct Replicator { + pub client: Client, + pub bucket: String, + pub prefix: String, +} + +impl Replicator { + pub async fn new(options: Options) -> Result { + let mut loader = aws_config::from_env(); + if let Ok(endpoint) = std::env::var("MVCCRS_ENDPOINT") { + loader = loader.endpoint_url(endpoint); + } + let sdk_config = loader.load().await; + let config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + let bucket = std::env::var("MVCCRS_BUCKET").unwrap_or_else(|_| "mvccrs".to_string()); + let prefix = std::env::var("MVCCRS_PREFIX").unwrap_or_else(|_| "tx".to_string()); + let client = Client::from_conf(config); + + match client.head_bucket().bucket(&bucket).send().await { + Ok(_) => tracing::info!("Bucket {bucket} exists and is accessible"), + Err(aws_sdk_s3::error::SdkError::ServiceError(err)) if err.err().is_not_found() => { + if options.create_bucket_if_not_exists { + tracing::info!("Bucket {bucket} not found, recreating"); + client + .create_bucket() + .bucket(&bucket) + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + } else { + tracing::error!("Bucket {bucket} does not exist"); + return Err(DatabaseError::Io(err.err().to_string())); + } + } + Err(e) => { + tracing::error!("Bucket checking error: {e}"); + return Err(DatabaseError::Io(e.to_string())); + } + } + + Ok(Self { + client, + bucket, + prefix, + }) + } + + pub async fn replicate_tx(&self, record: LogRecord) -> Result<()> { + let key = format!("{}-{:020}", self.prefix, record.tx_timestamp); + tracing::trace!("Replicating {key}"); + let body = serde_json::to_vec(&record).map_err(|e| DatabaseError::Io(e.to_string()))?; + let resp = self + .client + .put_object() + .bucket(&self.bucket) + .key(&key) + .body(body.into()) + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + tracing::trace!("Replicator response: {:?}", resp); + Ok(()) + } + + pub async fn read_tx_log(&self) -> Result> { + let mut records: Vec = Vec::new(); + // Read all objects from the bucket, one log record is stored in one object + let mut next_token = None; + loop { + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(&self.prefix); + if let Some(next_token) = next_token { + req = req.continuation_token(next_token); + } + let resp = req + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + tracing::trace!("List objects response: {:?}", resp); + if let Some(contents) = resp.contents { + // read the record from s3 based on the object metadata (`contents`) + // and store it in the `records` vector + for object in contents { + let key = object.key.unwrap(); + let resp = self + .client + .get_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + tracing::trace!("Get object response: {:?}", resp); + let body = resp + .body + .collect() + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + let record: LogRecord = serde_json::from_slice(&body.into_bytes()) + .map_err(|e| DatabaseError::Io(e.to_string()))?; + records.push(record); + } + } + if resp.next_continuation_token.is_none() { + break; + } + next_token = resp.next_continuation_token; + } + tracing::trace!("Records: {records:?}"); + Ok(records) + } +}