diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs index 0cac45259..e7d17923b 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -5,13 +5,10 @@ use std::fmt::Debug; use crate::database::{LogRecord, Result}; use crate::errors::DatabaseError; -pub mod s3; - #[derive(Debug)] pub enum Storage { Noop, JsonOnDisk(std::path::PathBuf), - S3(s3::Replicator), } impl Storage { @@ -23,11 +20,6 @@ impl Storage { let path = path.into(); Self::JsonOnDisk(path) } - - pub fn new_s3(options: s3::Options) -> Result { - let replicator = futures::executor::block_on(s3::Replicator::new(options))?; - Ok(Self::S3(replicator)) - } } impl Storage { @@ -46,9 +38,6 @@ impl Storage { file.write_all(b"\n") .map_err(|e| DatabaseError::Io(e.to_string()))?; } - Self::S3(replicator) => { - futures::executor::block_on(replicator.replicate_tx(m))?; - } Self::Noop => (), } Ok(()) diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs deleted file mode 100644 index cda65fd5e..000000000 --- a/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs +++ /dev/null @@ -1,139 +0,0 @@ -use crate::database::{LogRecord, Result}; -use crate::errors::DatabaseError; -use aws_sdk_s3::Client; -use serde::Serialize; -use serde::de::DeserializeOwned; -use std::fmt::Debug; - -#[derive(Clone, Copy, Debug)] -#[non_exhaustive] -pub struct Options { - pub create_bucket_if_not_exists: bool, -} - -impl Options { - pub fn with_create_bucket_if_not_exists(create_bucket_if_not_exists: bool) -> Self { - Self { - create_bucket_if_not_exists, - } - } -} - -#[derive(Debug)] -pub struct Replicator { - pub client: Client, - pub bucket: String, - pub prefix: String, -} - -impl Replicator { - pub async fn new(options: Options) -> Result { - let mut loader = aws_config::from_env(); - if let Ok(endpoint) = std::env::var("MVCCRS_ENDPOINT") { - loader = loader.endpoint_url(endpoint); - } - let sdk_config = loader.load().await; - let config = aws_sdk_s3::config::Builder::from(&sdk_config) - .force_path_style(true) - .build(); - let bucket = std::env::var("MVCCRS_BUCKET").unwrap_or_else(|_| "mvccrs".to_string()); - let prefix = std::env::var("MVCCRS_PREFIX").unwrap_or_else(|_| "tx".to_string()); - let client = Client::from_conf(config); - - match client.head_bucket().bucket(&bucket).send().await { - Ok(_) => tracing::info!("Bucket {bucket} exists and is accessible"), - Err(aws_sdk_s3::error::SdkError::ServiceError(err)) if err.err().is_not_found() => { - if options.create_bucket_if_not_exists { - tracing::info!("Bucket {bucket} not found, recreating"); - client - .create_bucket() - .bucket(&bucket) - .send() - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - } else { - tracing::error!("Bucket {bucket} does not exist"); - return Err(DatabaseError::Io(err.err().to_string())); - } - } - Err(e) => { - tracing::error!("Bucket checking error: {e}"); - return Err(DatabaseError::Io(e.to_string())); - } - } - - Ok(Self { - client, - bucket, - prefix, - }) - } - - pub async fn replicate_tx(&self, record: LogRecord) -> Result<()> { - let key = format!("{}-{:020}", self.prefix, record.tx_timestamp); - tracing::trace!("Replicating {key}"); - let body = serde_json::to_vec(&record).map_err(|e| DatabaseError::Io(e.to_string()))?; - let resp = self - .client - .put_object() - .bucket(&self.bucket) - .key(&key) - .body(body.into()) - .send() - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - tracing::trace!("Replicator response: {:?}", resp); - Ok(()) - } - - pub async fn read_tx_log(&self) -> Result>> { - let mut records: Vec> = Vec::new(); - // Read all objects from the bucket, one log record is stored in one object - let mut next_token = None; - loop { - let mut req = self - .client - .list_objects_v2() - .bucket(&self.bucket) - .prefix(&self.prefix); - if let Some(next_token) = next_token { - req = req.continuation_token(next_token); - } - let resp = req - .send() - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - tracing::trace!("List objects response: {:?}", resp); - if let Some(contents) = resp.contents { - // read the record from s3 based on the object metadata (`contents`) - // and store it in the `records` vector - for object in contents { - let key = object.key.unwrap(); - let resp = self - .client - .get_object() - .bucket(&self.bucket) - .key(&key) - .send() - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - tracing::trace!("Get object response: {:?}", resp); - let body = resp - .body - .collect() - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - let record: LogRecord = serde_json::from_slice(&body.into_bytes()) - .map_err(|e| DatabaseError::Io(e.to_string()))?; - records.push(record); - } - } - if resp.next_continuation_token.is_none() { - break; - } - next_token = resp.next_continuation_token; - } - tracing::trace!("Records: {records:?}"); - Ok(records) - } -}