mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-14 20:44:29 +01:00
database: drop RefCell from SkipMap
not needed, the structure is already Send&Sync
This commit is contained in:
@@ -138,7 +138,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
/// Creates a new database.
|
||||
pub fn new(clock: Clock, storage: Storage) -> Self {
|
||||
let inner = DatabaseInner {
|
||||
rows: RefCell::new(SkipMap::new()),
|
||||
rows: SkipMap::new(),
|
||||
txs: RefCell::new(HashMap::new()),
|
||||
tx_timestamps: RefCell::new(BTreeMap::new()),
|
||||
tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
|
||||
@@ -293,7 +293,7 @@ impl<Clock: LogicalClock> Database<Clock> {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseInner<Clock: LogicalClock> {
|
||||
rows: RefCell<SkipMap<RowID, RwLock<Vec<RowVersion>>>>,
|
||||
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
|
||||
txs: RefCell<HashMap<TxID, Transaction>>,
|
||||
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
|
||||
tx_ids: AtomicU64,
|
||||
@@ -314,8 +314,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
end: None,
|
||||
row,
|
||||
};
|
||||
let rows = self.rows.borrow_mut();
|
||||
let versions = rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
|
||||
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
|
||||
let mut versions = versions.value().write().unwrap();
|
||||
versions.push(row_version);
|
||||
tx.insert_to_write_set(id);
|
||||
@@ -327,8 +326,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
// NOTICE: They *are* dropped before an await point!!! But the await is conditional,
|
||||
// so I think clippy is just confused.
|
||||
let mut txs = self.txs.borrow_mut();
|
||||
let rows = self.rows.borrow_mut();
|
||||
let row_versions_opt = rows.get(&id);
|
||||
let row_versions_opt = self.rows.get(&id);
|
||||
if let Some(ref row_versions) = row_versions_opt {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
for rv in row_versions.iter_mut().rev() {
|
||||
@@ -340,7 +338,6 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
drop(txs);
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
drop(rows);
|
||||
self.rollback_tx(tx_id);
|
||||
return Err(DatabaseError::WriteWriteConflict);
|
||||
}
|
||||
@@ -361,8 +358,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
let txs = self.txs.borrow_mut();
|
||||
let tx = txs.get(&tx_id).unwrap();
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
let rows = self.rows.borrow();
|
||||
if let Some(row_versions) = rows.get(&id) {
|
||||
if let Some(row_versions) = self.rows.get(&id) {
|
||||
let row_versions = row_versions.value().read().unwrap();
|
||||
for rv in row_versions.iter().rev() {
|
||||
if is_version_visible(&txs, tx, rv) {
|
||||
@@ -375,14 +371,13 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
}
|
||||
|
||||
fn scan_row_ids(&self) -> Result<Vec<RowID>> {
|
||||
let rows = self.rows.borrow();
|
||||
let keys = rows.iter().map(|entry| *entry.key());
|
||||
let keys = self.rows.iter().map(|entry| *entry.key());
|
||||
Ok(keys.collect())
|
||||
}
|
||||
|
||||
fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
|
||||
let rows = &self.rows.borrow();
|
||||
Ok(rows
|
||||
Ok(self
|
||||
.rows
|
||||
.range(
|
||||
RowID {
|
||||
table_id,
|
||||
@@ -418,12 +413,11 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
}
|
||||
}
|
||||
let rows = self.rows.borrow_mut();
|
||||
tx.state = TransactionState::Preparing;
|
||||
tracing::trace!("PREPARE {tx}");
|
||||
let mut log_record: LogRecord = LogRecord::new(end_ts);
|
||||
for id in &tx.write_set {
|
||||
if let Some(row_versions) = rows.get(id) {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
for row_version in row_versions.iter_mut() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
@@ -457,7 +451,6 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
}
|
||||
}
|
||||
txs.remove(&tx_id);
|
||||
drop(rows);
|
||||
drop(txs);
|
||||
if !log_record.row_versions.is_empty() {
|
||||
self.storage.log_tx(log_record)?;
|
||||
@@ -471,13 +464,12 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
assert!(tx.state == TransactionState::Active);
|
||||
tx.state = TransactionState::Aborted;
|
||||
tracing::trace!("ABORT {tx}");
|
||||
let rows = self.rows.borrow_mut();
|
||||
for id in &tx.write_set {
|
||||
if let Some(row_versions) = rows.get(id) {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
|
||||
if row_versions.is_empty() {
|
||||
rows.remove(id);
|
||||
self.rows.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -506,9 +498,8 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
fn drop_unused_row_versions(&self) {
|
||||
let txs = self.txs.borrow();
|
||||
let tx_timestamps = self.tx_timestamps.borrow();
|
||||
let rows = self.rows.borrow_mut();
|
||||
let mut to_remove = Vec::new();
|
||||
for entry in rows.iter() {
|
||||
for entry in self.rows.iter() {
|
||||
let mut row_versions = entry.value().write().unwrap();
|
||||
row_versions.retain(|rv| {
|
||||
let should_stay = match rv.end {
|
||||
@@ -543,7 +534,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
}
|
||||
}
|
||||
for id in to_remove {
|
||||
rows.remove(&id);
|
||||
self.rows.remove(&id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -552,9 +543,9 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
|
||||
for record in tx_log {
|
||||
tracing::debug!("RECOVERING {:?}", record);
|
||||
for version in record.row_versions {
|
||||
let rows = self.rows.borrow_mut();
|
||||
let row_versions =
|
||||
rows.get_or_insert_with(version.row.id, || RwLock::new(Vec::new()));
|
||||
let row_versions = self
|
||||
.rows
|
||||
.get_or_insert_with(version.row.id, || RwLock::new(Vec::new()));
|
||||
let mut row_versions = row_versions.value().write().unwrap();
|
||||
row_versions.push(version);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
use super::*;
|
||||
use crate::clock::LocalClock;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
Reference in New Issue
Block a user