mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 00:45:37 +01:00
database: actually implement upserts
Fixes #55 - it was the code that should have been there in the first place, but I forgot to `git add`...
This commit is contained in:
@@ -316,7 +316,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
.unwrap_or(0);
|
||||
if versions.len() - position > 3 {
|
||||
tracing::debug!(
|
||||
"Inserting an element {} positions from the end",
|
||||
"Inserting a row version {} positions from the end",
|
||||
versions.len() - position
|
||||
);
|
||||
}
|
||||
@@ -339,7 +339,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
let id = row.id;
|
||||
let row_version = RowVersion {
|
||||
begin: TxTimestampOrID::TxID(tx.tx_id),
|
||||
@@ -379,9 +379,9 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
}
|
||||
|
||||
/// Inserts a row in the database with new values, previously deleting
|
||||
/// any old data if it existed.
|
||||
/// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
|
||||
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
|
||||
self.delete(tx_id, row.id).ok();
|
||||
self.delete(tx_id, row.id)?;
|
||||
self.insert(tx_id, row)
|
||||
}
|
||||
|
||||
@@ -409,7 +409,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let tx = tx.value().read().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
if is_write_write_conflict(&self.txs, &tx, rv) {
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
@@ -452,7 +452,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
|
||||
let tx = self.txs.get(&tx_id).unwrap();
|
||||
let tx = tx.value().read().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
if let Some(row_versions) = self.rows.get(&id) {
|
||||
let row_versions = row_versions.value().read().unwrap();
|
||||
for rv in row_versions.iter().rev() {
|
||||
@@ -520,7 +520,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
match tx.state.load() {
|
||||
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
|
||||
_ => {
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
}
|
||||
}
|
||||
tx.state.store(TransactionState::Preparing);
|
||||
@@ -660,7 +660,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
pub fn rollback_tx(&self, tx_id: TxID) {
|
||||
let tx_unlocked = self.txs.get(&tx_id).unwrap();
|
||||
let tx = tx_unlocked.value().write().unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
tx.state.store(TransactionState::Aborted);
|
||||
tracing::trace!("ABORT {tx}");
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
@@ -677,6 +677,9 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
let tx = tx_unlocked.value().write().unwrap();
|
||||
tx.state.store(TransactionState::Terminated);
|
||||
tracing::trace!("TERMINATE {tx}");
|
||||
// FIXME: verify that we can already remove the transaction here!
|
||||
// Maybe it's fine for snapshot isolation, but too early for serializable?
|
||||
self.txs.remove(&tx_id);
|
||||
}
|
||||
|
||||
/// Generates next unique transaction id
|
||||
@@ -693,27 +696,33 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
/// which sometimes leaves versions intact for too long.
|
||||
/// Returns the number of removed versions.
|
||||
pub fn drop_unused_row_versions(&self) -> usize {
|
||||
tracing::debug!(
|
||||
"transactions: {}; rows: {}",
|
||||
tracing::trace!(
|
||||
"Dropping unused row versions. Database stats: transactions: {}; rows: {}",
|
||||
self.txs.len(),
|
||||
self.rows.len()
|
||||
);
|
||||
let mut dropped = 0;
|
||||
let mut to_remove = Vec::new();
|
||||
for entry in self.rows.iter() {
|
||||
let mut row_versions = entry.value().write().unwrap();
|
||||
tracing::debug!("versions: {}", row_versions.len());
|
||||
row_versions.retain(|rv| {
|
||||
tracing::debug!("inspecting {rv:?}");
|
||||
// FIXME: should take rv.begin into account as well
|
||||
let should_stay = match rv.end {
|
||||
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
|
||||
// a transaction started before this row version ended,
|
||||
// ergo row version is needed
|
||||
// a transaction started before this row version ended, ergo row version is needed
|
||||
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
|
||||
self.txs
|
||||
.iter()
|
||||
.any(|tx| version_end_ts >= tx.value().read().unwrap().begin_ts)
|
||||
self.txs.iter().any(|tx| {
|
||||
let tx = tx.value().read().unwrap();
|
||||
// FIXME: verify!
|
||||
match tx.state.load() {
|
||||
TransactionState::Active | TransactionState::Preparing => {
|
||||
version_end_ts > tx.begin_ts
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Let's skip potentially complex logic if the transaction is still
|
||||
// Let's skip potentially complex logic if the transafction is still
|
||||
// active/tracked. We will drop the row version when the transaction
|
||||
// gets garbage-collected itself, it will always happen eventually.
|
||||
Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
|
||||
@@ -721,6 +730,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
None => true,
|
||||
};
|
||||
if !should_stay {
|
||||
dropped += 1;
|
||||
tracing::trace!(
|
||||
"Dropping row version {:?} {:?}-{:?}",
|
||||
entry.key(),
|
||||
@@ -734,7 +744,6 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
to_remove.push(*entry.key());
|
||||
}
|
||||
}
|
||||
let dropped = to_remove.len();
|
||||
for id in to_remove {
|
||||
self.rows.remove(&id);
|
||||
}
|
||||
@@ -766,11 +775,8 @@ pub(crate) fn is_write_write_conflict(
|
||||
let te = txs.get(&rv_end).unwrap();
|
||||
let te = te.value().read().unwrap();
|
||||
match te.state.load() {
|
||||
TransactionState::Active => tx.tx_id != te.tx_id,
|
||||
TransactionState::Preparing => todo!(),
|
||||
TransactionState::Committed(_end_ts) => todo!(),
|
||||
TransactionState::Aborted => todo!(),
|
||||
TransactionState::Terminated => todo!(),
|
||||
TransactionState::Active | TransactionState::Preparing => tx.tx_id != te.tx_id,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
Some(TxTimestampOrID::Timestamp(_)) => false,
|
||||
|
||||
@@ -82,6 +82,7 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
|
||||
let work = |prefix: &'static str| {
|
||||
let db = db.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut failed_upserts = 0;
|
||||
for i in 0..iterations {
|
||||
if i % 1000 == 0 {
|
||||
tracing::debug!("{prefix}: {i}");
|
||||
@@ -100,11 +101,19 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
|
||||
id,
|
||||
data: format!("{prefix} @{tx}"),
|
||||
};
|
||||
db.upsert(tx, row.clone()).unwrap();
|
||||
if let Err(e) = db.upsert(tx, row.clone()) {
|
||||
tracing::trace!("upsert failed: {e}");
|
||||
failed_upserts += 1;
|
||||
continue;
|
||||
}
|
||||
let committed_row = db.read(tx, id).unwrap();
|
||||
db.commit_tx(tx).unwrap();
|
||||
assert_eq!(committed_row, Some(row));
|
||||
}
|
||||
tracing::info!(
|
||||
"{prefix}'s failed upserts: {failed_upserts}/{iterations} {:.2}%",
|
||||
(failed_upserts * 100) as f64 / iterations as f64
|
||||
);
|
||||
})
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user