diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index 509fa94cf..7d222a8d2 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -13,10 +13,12 @@ use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext}; type Clock = clock::LocalClock; /// cbindgen:ignore -type Db = database::Database; +/// Note - We use String type in C bindings as Row type. Type is generic. +type Db = database::Database; /// cbindgen:ignore -type ScanCursor = cursor::ScanCursor<'static, Clock>; +/// Note - We use String type in C bindings as Row type. Type is generic. +type ScanCursor = cursor::ScanCursor<'static, Clock, String>; static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new(); diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs index 7042c090f..d02e49541 100644 --- a/core/mvcc/mvcc-rs/src/cursor.rs +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -1,20 +1,24 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; + use crate::clock::LogicalClock; use crate::database::{Database, Result, Row, RowID}; +use std::fmt::Debug; #[derive(Debug)] -pub struct ScanCursor<'a, Clock: LogicalClock> { - pub db: &'a Database, +pub struct ScanCursor<'a, Clock: LogicalClock, T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug> { + pub db: &'a Database, pub row_ids: Vec, pub index: usize, tx_id: u64, } -impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { +impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Serialize + DeserializeOwned + Debug> ScanCursor<'a, Clock, T> { pub fn new( - db: &'a Database, + db: &'a Database, tx_id: u64, table_id: u64, - ) -> Result> { + ) -> Result> { let row_ids = db.scan_row_ids_for_table(table_id)?; Ok(Self { db, @@ -31,7 +35,7 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { Some(self.row_ids[self.index]) } - pub fn current_row(&self) -> Result> { + pub fn current_row(&self) -> Result>> { if self.index >= self.row_ids.len() { return Ok(None); } diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 4bdde3b12..9bdade7fd 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -2,7 +2,9 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; use crossbeam_skiplist::{SkipMap, SkipSet}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; @@ -19,29 +21,29 @@ pub struct RowID { #[derive(Clone, Debug, PartialEq, PartialOrd, Serialize, Deserialize)] -pub struct Row { +pub struct Row { pub id: RowID, - pub data: String, + pub data: T, } /// A row version. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct RowVersion { +pub struct RowVersion { begin: TxTimestampOrID, end: Option, - row: Row, + row: Row, } pub type TxID = u64; /// A log record contains all the versions inserted and deleted by a transaction. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LogRecord { +pub struct LogRecord { pub(crate) tx_timestamp: TxID, - row_versions: Vec, + row_versions: Vec>, } -impl LogRecord { +impl LogRecord { fn new(tx_timestamp: TxID) -> Self { Self { tx_timestamp, @@ -254,15 +256,22 @@ impl AtomicTransactionState { } #[derive(Debug)] -pub struct Database { - rows: SkipMap>>, +pub struct Database< + Clock: LogicalClock, + T: Sync + Send + Clone + Serialize + Debug + DeserializeOwned, +> { + rows: SkipMap>>>, txs: SkipMap>, tx_ids: AtomicU64, clock: Clock, storage: Storage, } -impl Database { +impl< + Clock: LogicalClock, + T: Sync + Send + Clone + Serialize + Debug + DeserializeOwned, + > Database +{ /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { Self { @@ -292,7 +301,7 @@ impl Database { /// Inserts a new row version into the database, while making sure that /// the row version is inserted in the correct order. - fn insert_version(&self, id: RowID, row_version: RowVersion) { + fn insert_version(&self, id: RowID, row_version: RowVersion) { let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); let mut versions = versions.value().write().unwrap(); self.insert_version_raw(&mut versions, row_version) @@ -300,7 +309,7 @@ impl Database { /// Inserts a new row version into the internal data structure for versions, /// while making sure that the row version is inserted in the correct order. - fn insert_version_raw(&self, versions: &mut Vec, row_version: RowVersion) { + fn insert_version_raw(&self, versions: &mut Vec>, row_version: RowVersion) { // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity. // However, we expect the number of versions to be nearly sorted, so we deem it worthy // to search linearly for the insertion point instead of paying the price of using @@ -333,7 +342,7 @@ impl Database { /// * `tx_id` - the ID of the transaction in which to insert the new row. /// * `row` - the row object containing the values to be inserted. /// - pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let tx = self .txs .get(&tx_id) @@ -370,7 +379,7 @@ impl Database { /// # Returns /// /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub fn update(&self, tx_id: TxID, row: Row) -> Result { + pub fn update(&self, tx_id: TxID, row: Row) -> Result { if !self.delete(tx_id, row.id)? { return Ok(false); } @@ -380,7 +389,7 @@ impl Database { /// Inserts a row in the database with new values, previously deleting /// any old data if it existed. Bails on a delete error, e.g. write-write conflict. - pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { self.delete(tx_id, row.id)?; self.insert(tx_id, row) } @@ -449,7 +458,7 @@ impl Database { /// /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. - pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { + pub fn read(&self, tx_id: TxID, id: RowID) -> Result>> { let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value().read().unwrap(); assert_eq!(tx.state, TransactionState::Active); @@ -606,7 +615,7 @@ impl Database { drop(tx); // Postprocessing: inserting row versions and logging the transaction to persistent storage. // TODO: we should probably save to persistent storage first, and only then update the in-memory structures. - let mut log_record: LogRecord = LogRecord::new(end_ts); + let mut log_record: LogRecord = LogRecord::new(end_ts); for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); @@ -665,15 +674,18 @@ impl Database { tracing::trace!("ABORT {tx}"); let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); drop(tx); + for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); if row_versions.is_empty() { - self.rows.remove(id); + // !TODO! FIXME! This is a bug, because we can't remove the row here! + // self.rows.remove(id); } } } + let tx = tx_unlocked.value().write().unwrap(); tx.state.store(TransactionState::Terminated); tracing::trace!("TERMINATE {tx}"); @@ -745,7 +757,8 @@ impl Database { } } for id in to_remove { - self.rows.remove(&id); + // !TODO! FIXME! This is a bug, because we can't remove the row here! + // self.rows.remove(&id); } dropped } @@ -765,10 +778,10 @@ impl Database { /// A write-write conflict happens when transaction T_m attempts to update a /// row version that is currently being updated by an active transaction T_n. -pub(crate) fn is_write_write_conflict( +pub(crate) fn is_write_write_conflict( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { match rv.end { Some(TxTimestampOrID::TxID(rv_end)) => { @@ -784,18 +797,18 @@ pub(crate) fn is_write_write_conflict( } } -pub(crate) fn is_version_visible( +pub(crate) fn is_version_visible( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv) } -fn is_begin_visible( +fn is_begin_visible( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { match rv.begin { TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, @@ -822,10 +835,10 @@ fn is_begin_visible( } } -fn is_end_visible( +fn is_end_visible( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { match rv.end { Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, diff --git a/core/mvcc/mvcc-rs/src/database/tests.rs b/core/mvcc/mvcc-rs/src/database/tests.rs index e9023c76a..225c34a0e 100644 --- a/core/mvcc/mvcc-rs/src/database/tests.rs +++ b/core/mvcc/mvcc-rs/src/database/tests.rs @@ -2,7 +2,7 @@ use super::*; use crate::clock::LocalClock; use tracing_test::traced_test; -fn test_db() -> Database { +fn test_db() -> Database { let clock = LocalClock::new(); let storage = crate::persistent_storage::Storage::new_noop(); Database::new(clock, storage) @@ -721,7 +721,7 @@ fn test_storage1() { let clock = LocalClock::new(); let storage = crate::persistent_storage::Storage::new_json_on_disk(path); - let db = Database::new(clock, storage); + let db: Database = Database::new(clock, storage); db.recover().unwrap(); println!("{:#?}", db); diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs index 185a432ee..0cac45259 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -1,3 +1,7 @@ +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::fmt::Debug; + use crate::database::{LogRecord, Result}; use crate::errors::DatabaseError; @@ -27,7 +31,7 @@ impl Storage { } impl Storage { - pub fn log_tx(&self, m: LogRecord) -> Result<()> { + pub fn log_tx(&self, m: LogRecord) -> Result<()> { match self { Self::JsonOnDisk(path) => { use std::io::Write; @@ -50,7 +54,7 @@ impl Storage { Ok(()) } - pub fn read_tx_log(&self) -> Result> { + pub fn read_tx_log(&self) -> Result>> { match self { Self::JsonOnDisk(path) => { use std::io::BufRead; @@ -59,7 +63,7 @@ impl Storage { .open(path) .map_err(|e| DatabaseError::Io(e.to_string()))?; - let mut records: Vec = Vec::new(); + let mut records: Vec> = Vec::new(); let mut lines = std::io::BufReader::new(file).lines(); while let Some(Ok(line)) = lines.next() { records.push( diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs index 836c35363..cda65fd5e 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/s3.rs @@ -1,6 +1,9 @@ use crate::database::{LogRecord, Result}; use crate::errors::DatabaseError; use aws_sdk_s3::Client; +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::fmt::Debug; #[derive(Clone, Copy, Debug)] #[non_exhaustive] @@ -66,7 +69,7 @@ impl Replicator { }) } - pub async fn replicate_tx(&self, record: LogRecord) -> Result<()> { + pub async fn replicate_tx(&self, record: LogRecord) -> Result<()> { let key = format!("{}-{:020}", self.prefix, record.tx_timestamp); tracing::trace!("Replicating {key}"); let body = serde_json::to_vec(&record).map_err(|e| DatabaseError::Io(e.to_string()))?; @@ -83,8 +86,8 @@ impl Replicator { Ok(()) } - pub async fn read_tx_log(&self) -> Result> { - let mut records: Vec = Vec::new(); + pub async fn read_tx_log(&self) -> Result>> { + let mut records: Vec> = Vec::new(); // Read all objects from the bucket, one log record is stored in one object let mut next_token = None; loop { @@ -120,7 +123,7 @@ impl Replicator { .collect() .await .map_err(|e| DatabaseError::Io(e.to_string()))?; - let record: LogRecord = serde_json::from_slice(&body.into_bytes()) + let record: LogRecord = serde_json::from_slice(&body.into_bytes()) .map_err(|e| DatabaseError::Io(e.to_string()))?; records.push(record); }