Merge pull request #45 from penberg/s3stor

storage: add S3 storage
This commit is contained in:
Pekka Enberg
2023-05-16 16:59:24 +03:00
committed by GitHub
7 changed files with 259 additions and 95 deletions

View File

@@ -16,3 +16,8 @@ mvcc-rs = { path = "../../mvcc-rs" }
tokio = { version = "1.27.0", features = ["full", "parking_lot"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0" }
[features]
default = []
json_on_disk_storage = []
s3_storage = []

View File

@@ -5,6 +5,7 @@ mod errors;
mod types;
use errors::MVCCError;
use mvcc_rs::persistent_storage::{s3, Storage};
use mvcc_rs::*;
use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext};
@@ -19,6 +20,21 @@ type ScanCursor = cursor::ScanCursor<'static, Clock>;
static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new();
async fn storage_for(main_db_path: &str) -> database::Result<Storage> {
// TODO: let's accept an URL instead of main_db_path here, so we can
// pass custom S3 endpoints, options, etc.
if cfg!(feature = "json_on_disk_storage") {
tracing::info!("JSONonDisk storage stored in {main_db_path}-mvcc");
return Ok(Storage::new_json_on_disk(format!("{main_db_path}-mvcc")));
}
if cfg!(feature = "s3_storage") {
tracing::info!("S3 storage for {main_db_path}");
return Storage::new_s3(s3::Options::with_create_bucket_if_not_exists(true)).await;
}
tracing::info!("No persistent storage for {main_db_path}");
Ok(Storage::new_noop())
}
#[no_mangle]
pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCCDatabaseRef {
INIT_RUST_LOG.call_once(|| {
@@ -28,18 +44,28 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC
tracing::debug!("MVCCDatabaseOpen");
let clock = clock::LocalClock::new();
let path = unsafe { std::ffi::CStr::from_ptr(path) };
let path = match path.to_str() {
let main_db_path = unsafe { std::ffi::CStr::from_ptr(path) };
let main_db_path = match main_db_path.to_str() {
Ok(path) => path,
Err(_) => {
tracing::error!("Invalid UTF-8 path");
return MVCCDatabaseRef::null();
}
};
tracing::debug!("mvccrs: opening persistent storage at {path}");
let storage = crate::persistent_storage::Storage::new_json_on_disk(path);
let db = Db::new(clock, storage);
let runtime = tokio::runtime::Runtime::new().unwrap();
tracing::debug!("mvccrs: opening persistent storage for {main_db_path}");
let storage = match runtime.block_on(storage_for(main_db_path)) {
Ok(storage) => storage,
Err(e) => {
tracing::error!("Failed to open persistent storage: {e}");
return MVCCDatabaseRef::null();
}
};
let db = Db::new(clock, storage);
runtime.block_on(db.recover()).ok();
let ctx = DbContext { db, runtime };
let ctx = Box::leak(Box::new(ctx));
MVCCDatabaseRef::from(ctx)

View File

@@ -15,6 +15,9 @@ serde_json = "1.0.96"
pin-project = "1.0.12"
tracing-subscriber = { version = "0", optional = true }
base64 = "0.21.0"
aws-sdk-s3 = "0.27.0"
aws-config = "0.55.2"
tokio-util = "0.7.8"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }

View File

@@ -36,7 +36,7 @@ pub type TxID = u64;
/// A log record contains all the versions inserted and deleted by a transaction.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogRecord {
tx_timestamp: TxID,
pub(crate) tx_timestamp: TxID,
row_versions: Vec<RowVersion>,
}
@@ -530,15 +530,9 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
}
pub async fn recover(&self) -> Result<()> {
use futures::StreamExt;
let tx_log = self
.storage
.read_tx_log()
.await?
.collect::<Vec<LogRecord>>()
.await;
let tx_log = self.storage.read_tx_log().await?;
for record in tx_log {
println!("RECOVERING {:?}", record);
tracing::debug!("RECOVERING {:?}", record);
for version in record.row_versions {
let mut rows = self.rows.borrow_mut();
let row_versions = rows.entry(version.row.id).or_insert_with(Vec::new);

View File

@@ -1,81 +0,0 @@
use crate::database::{LogRecord, Result};
use crate::errors::DatabaseError;
#[derive(Debug)]
pub enum Storage {
Noop,
JsonOnDisk(std::path::PathBuf),
}
impl Storage {
pub fn new_noop() -> Self {
Self::Noop
}
pub fn new_json_on_disk(path: impl Into<std::path::PathBuf>) -> Self {
let path = path.into();
Self::JsonOnDisk(path)
}
}
#[pin_project::pin_project]
pub struct JsonOnDiskStream {
#[pin]
inner: tokio_stream::wrappers::LinesStream<tokio::io::BufReader<tokio::fs::File>>,
}
impl futures::stream::Stream for JsonOnDiskStream {
type Item = LogRecord;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.inner
.poll_next(cx)
.map(|x| x.and_then(|x| x.ok().and_then(|x| serde_json::from_str(x.as_str()).ok())))
}
}
impl Storage {
pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
if let Self::JsonOnDisk(path) = self {
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
}
Ok(())
}
pub async fn read_tx_log(&self) -> Result<JsonOnDiskStream> {
if let Self::JsonOnDisk(path) = self {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(&path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
Ok(JsonOnDiskStream {
inner: tokio_stream::wrappers::LinesStream::new(
tokio::io::BufReader::new(file).lines(),
),
})
} else {
Err(crate::errors::DatabaseError::Io(
"cannot read from Noop storage".to_string(),
))
}
}
}

View File

@@ -0,0 +1,81 @@
use crate::database::{LogRecord, Result};
use crate::errors::DatabaseError;
pub mod s3;
#[derive(Debug)]
pub enum Storage {
Noop,
JsonOnDisk(std::path::PathBuf),
S3(s3::Replicator),
}
impl Storage {
pub fn new_noop() -> Self {
Self::Noop
}
pub fn new_json_on_disk(path: impl Into<std::path::PathBuf>) -> Self {
let path = path.into();
Self::JsonOnDisk(path)
}
pub async fn new_s3(options: s3::Options) -> Result<Self> {
Ok(Self::S3(s3::Replicator::new(options).await?))
}
}
impl Storage {
pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
match self {
Self::JsonOnDisk(path) => {
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
}
Self::S3(replicator) => {
replicator.replicate_tx(m).await?;
}
Self::Noop => (),
}
Ok(())
}
pub async fn read_tx_log(&self) -> Result<Vec<LogRecord>> {
match self {
Self::JsonOnDisk(path) => {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(&path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut records: Vec<LogRecord> = Vec::new();
let mut lines = tokio::io::BufReader::new(file).lines();
while let Ok(Some(line)) = lines.next_line().await {
records.push(
serde_json::from_str(&line)
.map_err(|e| DatabaseError::Io(e.to_string()))?,
)
}
Ok(records)
}
Self::S3(replicator) => replicator.read_tx_log().await,
Self::Noop => Err(crate::errors::DatabaseError::Io(
"cannot read from Noop storage".to_string(),
)),
}
}
}

View File

@@ -0,0 +1,136 @@
use crate::database::{LogRecord, Result};
use crate::errors::DatabaseError;
use aws_sdk_s3::Client;
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct Options {
pub create_bucket_if_not_exists: bool,
}
impl Options {
pub fn with_create_bucket_if_not_exists(create_bucket_if_not_exists: bool) -> Self {
Self {
create_bucket_if_not_exists,
}
}
}
#[derive(Debug)]
pub struct Replicator {
pub client: Client,
pub bucket: String,
pub prefix: String,
}
impl Replicator {
pub async fn new(options: Options) -> Result<Self> {
let mut loader = aws_config::from_env();
if let Ok(endpoint) = std::env::var("MVCCRS_ENDPOINT") {
loader = loader.endpoint_url(endpoint);
}
let sdk_config = loader.load().await;
let config = aws_sdk_s3::config::Builder::from(&sdk_config)
.force_path_style(true)
.build();
let bucket = std::env::var("MVCCRS_BUCKET").unwrap_or_else(|_| "mvccrs".to_string());
let prefix = std::env::var("MVCCRS_PREFIX").unwrap_or_else(|_| "tx".to_string());
let client = Client::from_conf(config);
match client.head_bucket().bucket(&bucket).send().await {
Ok(_) => tracing::info!("Bucket {bucket} exists and is accessible"),
Err(aws_sdk_s3::error::SdkError::ServiceError(err)) if err.err().is_not_found() => {
if options.create_bucket_if_not_exists {
tracing::info!("Bucket {bucket} not found, recreating");
client
.create_bucket()
.bucket(&bucket)
.send()
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
} else {
tracing::error!("Bucket {bucket} does not exist");
return Err(DatabaseError::Io(err.err().to_string()));
}
}
Err(e) => {
tracing::error!("Bucket checking error: {e}");
return Err(DatabaseError::Io(e.to_string()));
}
}
Ok(Self {
client,
bucket,
prefix,
})
}
pub async fn replicate_tx(&self, record: LogRecord) -> Result<()> {
let key = format!("{}-{:020}", self.prefix, record.tx_timestamp);
tracing::trace!("Replicating {key}");
let body = serde_json::to_vec(&record).map_err(|e| DatabaseError::Io(e.to_string()))?;
let resp = self
.client
.put_object()
.bucket(&self.bucket)
.key(&key)
.body(body.into())
.send()
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
tracing::trace!("Replicator response: {:?}", resp);
Ok(())
}
pub async fn read_tx_log(&self) -> Result<Vec<LogRecord>> {
let mut records: Vec<LogRecord> = Vec::new();
// Read all objects from the bucket, one log record is stored in one object
let mut next_token = None;
loop {
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(&self.prefix);
if let Some(next_token) = next_token {
req = req.continuation_token(next_token);
}
let resp = req
.send()
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
tracing::trace!("List objects response: {:?}", resp);
if let Some(contents) = resp.contents {
// read the record from s3 based on the object metadata (`contents`)
// and store it in the `records` vector
for object in contents {
let key = object.key.unwrap();
let resp = self
.client
.get_object()
.bucket(&self.bucket)
.key(&key)
.send()
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
tracing::trace!("Get object response: {:?}", resp);
let body = resp
.body
.collect()
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
let record: LogRecord = serde_json::from_slice(&body.into_bytes())
.map_err(|e| DatabaseError::Io(e.to_string()))?;
records.push(record);
}
}
if resp.next_continuation_token.is_none() {
break;
}
next_token = resp.next_continuation_token;
}
tracing::trace!("Records: {records:?}");
Ok(records)
}
}