From d808db6af987ab3ea05cf1fe6e5e68447eeb8f99 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 20 Nov 2025 09:51:01 +0200 Subject: [PATCH] core: Switch to parking_lot::Mutex It's faster and we eliminate bunch of unwrap() calls. --- core/ext/mod.rs | 7 ++--- core/incremental/aggregate_operator.rs | 5 ++-- core/incremental/cursor.rs | 7 +++-- core/incremental/filter_operator.rs | 7 +++-- core/incremental/input_operator.rs | 3 +- core/incremental/join_operator.rs | 5 ++-- core/incremental/merge_operator.rs | 3 +- core/incremental/operator.rs | 16 ++++++----- core/incremental/project_operator.rs | 7 +++-- core/incremental/view.rs | 3 +- core/io/memory.rs | 7 +++-- core/lib.rs | 28 ++++++++----------- .../mvcc/database/checkpoint_state_machine.rs | 10 ++----- core/mvcc/database/mod.rs | 6 ++-- core/mvcc/database/tests.rs | 2 +- core/schema.rs | 2 +- core/storage/btree.rs | 11 ++------ core/storage/pager.rs | 6 ++-- core/storage/wal.rs | 14 +++++----- core/translate/planner.rs | 2 +- core/translate/pragma.rs | 2 +- core/util.rs | 7 ++--- core/vdbe/builder.rs | 2 +- core/vdbe/execute.rs | 8 +++--- core/vdbe/mod.rs | 4 +-- 25 files changed, 81 insertions(+), 93 deletions(-) diff --git a/core/ext/mod.rs b/core/ext/mod.rs index 333767402..a5bd7e745 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -15,9 +15,10 @@ use crate::{vtab::VirtualTable, SymbolTable}; use crate::{LimboError, IO}; #[cfg(feature = "fs")] pub use dynamic::{add_builtin_vfs_extensions, add_vfs_module, list_vfs_modules, VfsMod}; +use parking_lot::Mutex; use std::{ ffi::{c_char, c_void, CStr, CString}, - sync::{Arc, Mutex}, + sync::Arc, }; use turso_ext::{ ExtensionApi, InitAggFunction, ResultCode, ScalarFunction, VTabKind, VTabModuleImpl, @@ -65,9 +66,7 @@ pub(crate) unsafe extern "C" fn register_vtab_module( // Use the schema handler to insert the table let table = Arc::new(Table::Virtual(vtab)); let mutex = &*(ext_ctx.schema as *mut Mutex>); - let Ok(guard) = mutex.lock() else { - return ResultCode::Error; - }; + let guard = mutex.lock(); let schema_ptr = Arc::as_ptr(&*guard) as *mut Schema; (*schema_ptr).tables.insert(name_str, table); } else { diff --git a/core/incremental/aggregate_operator.rs b/core/incremental/aggregate_operator.rs index a6d0fb5b8..f80dc1884 100644 --- a/core/incremental/aggregate_operator.rs +++ b/core/incremental/aggregate_operator.rs @@ -10,9 +10,10 @@ use crate::incremental::persistence::{ReadRecord, WriteRow}; use crate::storage::btree::CursorTrait; use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, ValueRef}; use crate::{return_and_restore_if_io, return_if_io, LimboError, Result, Value}; +use parking_lot::Mutex; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{self, Display}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; // Architecture of the Aggregate Operator // ======================================== @@ -1492,7 +1493,7 @@ impl AggregateOperator { // Process each change in the delta for (row, weight) in delta.changes.iter() { if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_aggregation(); + tracker.lock().record_aggregation(); } // Extract group key diff --git a/core/incremental/cursor.rs b/core/incremental/cursor.rs index 470f6bbc1..c6dcb2cd4 100644 --- a/core/incremental/cursor.rs +++ b/core/incremental/cursor.rs @@ -9,7 +9,8 @@ use crate::{ types::{IOResult, SeekKey, SeekOp, SeekResult, Value}, LimboError, Pager, Result, }; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; /// State machine for seek operations #[derive(Debug)] @@ -89,7 +90,7 @@ impl MaterializedViewCursor { } // Get the view and the current transaction state - let mut view_guard = self.view.lock().unwrap(); + let mut view_guard = self.view.lock(); let table_deltas = self.tx_state.get_table_deltas(); // Process the deltas through the circuit to get materialized changes @@ -350,7 +351,7 @@ mod tests { ))?; // Get the view's root page - let view = view_mutex.lock().unwrap(); + let view = view_mutex.lock(); let root_page = view.get_root_page(); if root_page == 0 { return Err(crate::LimboError::InternalError( diff --git a/core/incremental/filter_operator.rs b/core/incremental/filter_operator.rs index 5b9c7e5d9..d85640cb8 100644 --- a/core/incremental/filter_operator.rs +++ b/core/incremental/filter_operator.rs @@ -8,7 +8,8 @@ use crate::incremental::operator::{ }; use crate::types::IOResult; use crate::{Result, Value}; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; /// Filter predicate for filtering rows #[derive(Debug, Clone)] @@ -260,7 +261,7 @@ impl IncrementalOperator for FilterOperator { // Process the delta through the filter for (row, weight) in delta.changes { if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_filter(); + tracker.lock().record_filter(); } // Only pass through rows that satisfy the filter predicate @@ -292,7 +293,7 @@ impl IncrementalOperator for FilterOperator { // Only pass through and track rows that satisfy the filter predicate for (row, weight) in deltas.left.changes { if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_filter(); + tracker.lock().record_filter(); } // Only track and output rows that pass the filter diff --git a/core/incremental/input_operator.rs b/core/incremental/input_operator.rs index b9a6eeb01..ff8db63dc 100644 --- a/core/incremental/input_operator.rs +++ b/core/incremental/input_operator.rs @@ -7,7 +7,8 @@ use crate::incremental::operator::{ }; use crate::types::IOResult; use crate::Result; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; /// Input operator - source of data for the circuit /// Represents base relations/tables that receive external updates diff --git a/core/incremental/join_operator.rs b/core/incremental/join_operator.rs index 982545ca9..bc95416e9 100644 --- a/core/incremental/join_operator.rs +++ b/core/incremental/join_operator.rs @@ -9,7 +9,8 @@ use crate::incremental::persistence::WriteRow; use crate::storage::btree::CursorTrait; use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult}; use crate::{return_and_restore_if_io, return_if_io, Result, Value}; -use std::sync::{Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::Arc; #[derive(Debug, Clone, PartialEq)] pub enum JoinType { @@ -499,7 +500,7 @@ impl JoinOperator { if Self::sql_keys_equal(&left_key, &right_key) { if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_join_lookup(); + tracker.lock().record_join_lookup(); } // Combine the rows diff --git a/core/incremental/merge_operator.rs b/core/incremental/merge_operator.rs index 85a532f6d..d9c359ca7 100644 --- a/core/incremental/merge_operator.rs +++ b/core/incremental/merge_operator.rs @@ -7,10 +7,11 @@ use crate::incremental::operator::{ }; use crate::types::IOResult; use crate::Result; +use parking_lot::Mutex; use std::collections::{hash_map::DefaultHasher, HashMap}; use std::fmt::{self, Display}; use std::hash::{Hash, Hasher}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; /// How the merge operator should handle rowids when combining deltas #[derive(Debug, Clone)] diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 5d577a552..44d1f80d4 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -15,8 +15,9 @@ use crate::schema::{Index, IndexColumn}; use crate::storage::btree::BTreeCursor; use crate::types::IOResult; use crate::Result; +use parking_lot::Mutex; use std::fmt::Debug; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; /// Struct to hold both table and index cursors for DBSP state operations pub struct DbspStateCursors { @@ -263,7 +264,8 @@ mod tests { use crate::util::IOExt; use crate::Value; use crate::{Database, MemoryIO, IO}; - use std::sync::{Arc, Mutex}; + use parking_lot::Mutex; + use std::sync::Arc; /// Create a test pager for operator tests with both table and index fn create_test_pager() -> (std::sync::Arc, i64, i64) { @@ -702,7 +704,7 @@ mod tests { .unwrap(); // Reset tracker for delta processing - tracker.lock().unwrap().aggregation_updates = 0; + tracker.lock().aggregation_updates = 0; // Add one item to category 'cat_0' let mut delta = Delta::new(); @@ -720,7 +722,7 @@ mod tests { .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); - assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); + assert_eq!(tracker.lock().aggregation_updates, 1); // Check the final state - cat_0 should now have count 11 let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); @@ -732,7 +734,7 @@ mod tests { assert_eq!(cat_0.0.values[1], Value::Integer(11)); // Verify incremental behavior - we process the delta twice (eval + commit) - let t = tracker.lock().unwrap(); + let t = tracker.lock(); assert_incremental(&t, 2, 101); } @@ -804,7 +806,7 @@ mod tests { assert_eq!(widget_sum.values[1], Value::Integer(250)); // Reset tracker - tracker.lock().unwrap().aggregation_updates = 0; + tracker.lock().aggregation_updates = 0; // Add sale of 50 for Widget let mut delta = Delta::new(); @@ -822,7 +824,7 @@ mod tests { .block(|| agg.commit((&delta).into(), &mut cursors)) .unwrap(); - assert_eq!(tracker.lock().unwrap().aggregation_updates, 1); + assert_eq!(tracker.lock().aggregation_updates, 1); // Check final state - Widget should now be 300 (250 + 50) let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors); diff --git a/core/incremental/project_operator.rs b/core/incremental/project_operator.rs index 8103ac7ff..757132b07 100644 --- a/core/incremental/project_operator.rs +++ b/core/incremental/project_operator.rs @@ -8,7 +8,8 @@ use crate::incremental::operator::{ }; use crate::types::IOResult; use crate::{Connection, Database, Result, Value}; -use std::sync::{atomic::Ordering, Arc, Mutex}; +use parking_lot::Mutex; +use std::sync::{atomic::Ordering, Arc}; #[derive(Debug, Clone)] pub struct ProjectColumn { @@ -124,7 +125,7 @@ impl IncrementalOperator for ProjectOperator { for (row, weight) in delta.changes { if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_project(); + tracker.lock().record_project(); } let projected = self.project_values(&row.values); @@ -152,7 +153,7 @@ impl IncrementalOperator for ProjectOperator { // Commit the delta to our internal state and build output for (row, weight) in &deltas.left.changes { if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_project(); + tracker.lock().record_project(); } let projected = self.project_values(&row.values); let projected_row = HashableRow::new(row.rowid, projected); diff --git a/core/incremental/view.rs b/core/incremental/view.rs index 211271c82..1fa04e0fe 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -7,11 +7,12 @@ use crate::translate::logical::LogicalPlanBuilder; use crate::types::{IOResult, Value}; use crate::util::{extract_view_columns, ViewColumnSchema}; use crate::{return_if_io, LimboError, Pager, Result, Statement}; +use parking_lot::Mutex; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::fmt; use std::rc::Rc; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use turso_parser::ast; use turso_parser::{ ast::{Cmd, Stmt}, diff --git a/core/io/memory.rs b/core/io/memory.rs index 2e047b3b0..d7f57fcd4 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -3,10 +3,11 @@ use crate::turso_assert; use crate::{io::clock::DefaultClock, Result}; use crate::io::clock::Instant; +use parking_lot::Mutex; use std::{ cell::{Cell, UnsafeCell}, collections::{BTreeMap, HashMap}, - sync::{Arc, Mutex}, + sync::Arc, }; use tracing::debug; @@ -42,7 +43,7 @@ impl Clock for MemoryIO { impl IO for MemoryIO { fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { - let mut files = self.files.lock().unwrap(); + let mut files = self.files.lock(); if !files.contains_key(path) && !flags.contains(OpenFlags::Create) { return Err( crate::error::CompletionError::IOError(std::io::ErrorKind::NotFound).into(), @@ -61,7 +62,7 @@ impl IO for MemoryIO { Ok(files.get(path).unwrap().clone()) } fn remove_file(&self, path: &str) -> Result<()> { - let mut files = self.files.lock().unwrap(); + let mut files = self.files.lock(); files.remove(path); Ok(()) } diff --git a/core/lib.rs b/core/lib.rs index 39c2cb1c4..2e69689eb 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -67,7 +67,7 @@ pub use io::{ Buffer, Completion, CompletionType, File, GroupCompletion, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO, }; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use rustc_hash::FxHashMap; use schema::Schema; use std::collections::HashSet; @@ -82,7 +82,7 @@ use std::{ rc::Rc, sync::{ atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicIsize, AtomicU16, AtomicUsize, Ordering}, - Arc, LazyLock, Mutex, Weak, + Arc, LazyLock, Weak, }, time::Duration, }; @@ -276,7 +276,7 @@ impl fmt::Debug for Database { }; debug_struct.field("mv_store", &mv_store_status); - let init_lock_status = if self.init_lock.try_lock().is_ok() { + let init_lock_status = if self.init_lock.try_lock().is_some() { "unlocked" } else { "locked" @@ -380,7 +380,7 @@ impl Database { ); } - let mut registry = DATABASE_MANAGER.lock().unwrap(); + let mut registry = DATABASE_MANAGER.lock(); let canonical_path = std::fs::canonicalize(path) .ok() @@ -623,7 +623,7 @@ impl Database { let conn = Arc::new(Connection { db: self.clone(), pager: ArcSwap::new(pager), - schema: RwLock::new(self.schema.lock().unwrap().clone()), + schema: RwLock::new(self.schema.lock().clone()), database_schemas: RwLock::new(FxHashMap::default()), auto_commit: AtomicBool::new(true), transaction_state: AtomicTransactionState::new(TransactionState::None), @@ -900,17 +900,17 @@ impl Database { #[inline] pub(crate) fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> Result) -> Result { - let mut schema_ref = self.schema.lock().unwrap(); + let mut schema_ref = self.schema.lock(); let schema = Arc::make_mut(&mut *schema_ref); f(schema) } pub(crate) fn clone_schema(&self) -> Arc { - let schema = self.schema.lock().unwrap(); + let schema = self.schema.lock(); schema.clone() } pub(crate) fn update_schema_if_newer(&self, another: Arc) { - let mut schema = self.schema.lock().unwrap(); + let mut schema = self.schema.lock(); if schema.schema_version < another.schema_version { tracing::debug!( "DB schema is outdated: {} < {}", @@ -1352,7 +1352,7 @@ impl Connection { }; pager.end_read_tx(); - let db_schema_version = self.db.schema.lock().unwrap().schema_version; + let db_schema_version = self.db.schema.lock().schema_version; tracing::debug!( "path: {}, db_schema_version={} vs on_disk_schema_version={}", self.db.path, @@ -1676,7 +1676,7 @@ impl Connection { pub fn maybe_update_schema(&self) { let current_schema_version = self.schema.read().schema_version; - let schema = self.db.schema.lock().unwrap(); + let schema = self.db.schema.lock(); if matches!(self.get_tx_state(), TransactionState::None) && current_schema_version != schema.schema_version { @@ -2199,7 +2199,7 @@ impl Connection { ))); } - let use_indexes = self.db.schema.lock().unwrap().indexes_enabled(); + let use_indexes = self.db.schema.lock().indexes_enabled(); let use_mvcc = self.db.mv_store.is_some(); let use_views = self.db.experimental_views_enabled(); let use_strict = self.db.experimental_strict_enabled(); @@ -2316,11 +2316,7 @@ impl Connection { .get(&database_id) .expect("Database ID should be valid after resolve_database_id"); - let schema = db - .schema - .lock() - .expect("Schema lock should not fail") - .clone(); + let schema = db.schema.lock().clone(); // Cache the schema for future use schemas.insert(database_id, schema.clone()); diff --git a/core/mvcc/database/checkpoint_state_machine.rs b/core/mvcc/database/checkpoint_state_machine.rs index 9c457f0d8..15e1e9560 100644 --- a/core/mvcc/database/checkpoint_state_machine.rs +++ b/core/mvcc/database/checkpoint_state_machine.rs @@ -541,14 +541,8 @@ impl CheckpointStateMachine { .io .block(|| { self.pager.with_header_mut(|header| { - header.schema_cookie = self - .connection - .db - .schema - .lock() - .unwrap() - .schema_version - .into(); + header.schema_cookie = + self.connection.db.schema.lock().schema_version.into(); *header }) }) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 96f7236ff..9ad9f335d 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1057,7 +1057,7 @@ impl MvStore { // Make sure we capture all the schema changes that were deserialized from the logical log. bootstrap_conn.promote_to_regular_connection(); bootstrap_conn.reparse_schema()?; - *bootstrap_conn.db.schema.lock().unwrap() = bootstrap_conn.schema.read().clone(); + *bootstrap_conn.db.schema.lock() = bootstrap_conn.schema.read().clone(); Ok(()) } @@ -1644,9 +1644,7 @@ impl MvStore { } } - if connection.schema.read().schema_version - > connection.db.schema.lock().unwrap().schema_version - { + if connection.schema.read().schema_version > connection.db.schema.lock().schema_version { // Connection made schema changes during tx and rolled back -> revert connection-local schema. *connection.schema.write() = connection.db.clone_schema(); } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 497c94acd..cbc6d2281 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -74,7 +74,7 @@ impl MvccTestDbNoConn { // First let's clear any entries in database manager in order to force restart. // If not, we will load the same database instance again. { - let mut manager = DATABASE_MANAGER.lock().unwrap(); + let mut manager = DATABASE_MANAGER.lock(); manager.clear(); } diff --git a/core/schema.rs b/core/schema.rs index 84e3895ea..9803eef4e 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -127,10 +127,10 @@ use crate::{ }; use crate::{util::normalize_ident, Result}; use core::fmt; +use parking_lot::Mutex; use std::collections::{HashMap, HashSet, VecDeque}; use std::ops::Deref; use std::sync::Arc; -use std::sync::Mutex; use tracing::trace; use turso_parser::ast::{ self, ColumnDefinition, Expr, InitDeferredPred, Literal, RefAct, SortOrder, TableOptions, diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 84bfc4992..f200c78f6 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7799,14 +7799,7 @@ mod tests { vdbe::Register, BufferPool, Completion, Connection, IOContext, StepResult, WalFile, WalFileShared, }; - use std::{ - cell::RefCell, - collections::HashSet, - mem::transmute, - ops::Deref, - rc::Rc, - sync::{Arc, Mutex}, - }; + use std::{cell::RefCell, collections::HashSet, mem::transmute, ops::Deref, rc::Rc, sync::Arc}; use tempfile::TempDir; @@ -9082,7 +9075,7 @@ mod tests { Arc::new(parking_lot::RwLock::new(PageCache::new(10))), buffer_pool, Arc::new(AtomicDbState::new(DbState::Uninitialized)), - Arc::new(Mutex::new(())), + Arc::new(parking_lot::Mutex::new(())), ) .unwrap(), ); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 8f57577d7..465f0393e 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -15,7 +15,7 @@ use crate::{ IOResult, LimboError, Result, TransactionState, }; use crate::{io_yield_one, CompletionError, IOContext, OpenFlags, IO}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use roaring::RoaringBitmap; use std::cell::{RefCell, UnsafeCell}; use std::collections::BTreeSet; @@ -23,7 +23,7 @@ use std::rc::Rc; use std::sync::atomic::{ AtomicBool, AtomicU16, AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering, }; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tracing::{instrument, trace, Level}; use turso_macros::AtomicEnum; @@ -1423,7 +1423,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn maybe_allocate_page1(&self) -> Result> { if !self.db_state.get().is_initialized() { - if let Ok(_lock) = self.init_lock.try_lock() { + if let Some(_lock) = self.init_lock.try_lock() { match (self.db_state.get(), self.allocating_page1()) { // In case of being empty or (allocating and this connection is performing allocation) then allocate the first page (DbState::Uninitialized, false) | (DbState::Initializing, true) => { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7b8ae3dd1..018ad16aa 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,10 +1,10 @@ #![allow(clippy::not_unsafe_ptr_arg_deref)] +use parking_lot::Mutex; use rustc_hash::{FxHashMap, FxHashSet}; use std::array; use std::borrow::Cow; use std::collections::BTreeMap; -use std::sync::Mutex; use strum::EnumString; use tracing::{instrument, Level}; @@ -1254,7 +1254,7 @@ impl Wal for WalFile { ); let page = unsafe { std::slice::from_raw_parts(page_ptr as *mut u8, page_len) }; if buf.as_slice() != page { - *conflict.lock().unwrap() = true; + *conflict.lock() = true; } } }); @@ -1271,7 +1271,7 @@ impl Wal for WalFile { &self.io_ctx.read(), )?; self.io.wait_for_completion(c)?; - return if *conflict.lock().unwrap() { + return if *conflict.lock() { Err(LimboError::Conflict(format!( "frame content differs from the WAL: frame_id={frame_id}" ))) @@ -2516,10 +2516,10 @@ pub mod test { CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO, StepResult, Wal, WalFile, WalFileShared, IO, }; - use parking_lot::RwLock; + use parking_lot::{Mutex, RwLock}; #[cfg(unix)] use std::os::unix::fs::MetadataExt; - use std::sync::{atomic::Ordering, Arc, Mutex}; + use std::sync::{atomic::Ordering, Arc}; #[allow(clippy::arc_with_non_send_sync)] pub(crate) fn get_database() -> (Arc, std::path::PathBuf) { let mut path = tempfile::tempdir().unwrap().keep(); @@ -2550,11 +2550,11 @@ pub mod test { let _ = wal_file.truncate( WAL_HEADER_SIZE as u64, Completion::new_trunc(move |_| { - *_done.lock().unwrap() = true; + *_done.lock() = true; }), ); assert!(wal_file.size().unwrap() == WAL_HEADER_SIZE as u64); - assert!(*done.lock().unwrap()); + assert!(*done.lock()); } #[test] diff --git a/core/translate/planner.rs b/core/translate/planner.rs index b1c01fd59..fa8bf82d5 100644 --- a/core/translate/planner.rs +++ b/core/translate/planner.rs @@ -491,7 +491,7 @@ fn parse_table( } // Check if this materialized view has persistent storage - let view_guard = view.lock().unwrap(); + let view_guard = view.lock(); let root_page = view_guard.get_root_page(); if root_page == 0 { diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 8cc0c110d..ef24fe090 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -553,7 +553,7 @@ fn query_pragma( if let Some(table) = schema.get_table(&name) { emit_columns_for_table_info(&mut program, table.columns(), base_reg); } else if let Some(view_mutex) = schema.get_materialized_view(&name) { - let view = view_mutex.lock().unwrap(); + let view = view_mutex.lock(); let flat_columns = view.column_schema.flat_columns(); emit_columns_for_table_info(&mut program, &flat_columns, base_reg); } else if let Some(view) = schema.get_view(&name) { diff --git a/core/util.rs b/core/util.rs index abada7cfa..31fce20c8 100644 --- a/core/util.rs +++ b/core/util.rs @@ -14,12 +14,9 @@ use crate::{ LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, }; use crate::{Connection, MvStore, IO}; +use parking_lot::Mutex; use std::sync::atomic::AtomicU8; -use std::{ - collections::HashMap, - rc::Rc, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, rc::Rc, sync::Arc}; use tracing::{instrument, Level}; use turso_macros::match_ignore_ascii_case; use turso_parser::ast::{ diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 811f14598..ffd3c8839 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -143,7 +143,7 @@ pub enum CursorType { VirtualTable(Arc), MaterializedView( Arc, - Arc>, + Arc>, ), } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 2cc7ad665..90ddd7511 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -47,7 +47,7 @@ use std::ops::DerefMut; use std::{ borrow::BorrowMut, num::NonZero, - sync::{atomic::Ordering, Arc, Mutex}, + sync::{atomic::Ordering, Arc}, }; use turso_macros::match_ignore_ascii_case; @@ -75,7 +75,7 @@ use super::{ insn::{Cookie, RegisterOrLiteral}, CommitState, }; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use turso_parser::ast::{self, ForeignKeyClause, Name, ResolveType}; use turso_parser::parser::Parser; @@ -966,7 +966,7 @@ pub fn op_open_read( let cursor = maybe_promote_to_mvcc_cursor(btree_cursor)?; // Get the view name and look up or create its transaction state - let view_name = view_mutex.lock().unwrap().name().to_string(); + let view_name = view_mutex.lock().name().to_string(); let tx_state = program .connection .view_transaction_states @@ -7902,7 +7902,7 @@ pub fn op_populate_materialized_views( for (view_name, _root_page, cursor_id) in view_info { let schema = conn.schema.read(); if let Some(view) = schema.get_materialized_view(&view_name) { - let mut view = view.lock().unwrap(); + let mut view = view.lock(); // Drop the schema borrow before calling populate_from_table drop(schema); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 4047f6664..923365659 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -960,7 +960,7 @@ impl Program { let mut views = Vec::new(); for view_name in self.connection.view_transaction_states.get_view_names() { if let Some(view_mutex) = schema.get_materialized_view(&view_name) { - let view = view_mutex.lock().unwrap(); + let view = view_mutex.lock(); let root_page = view.get_root_page(); // Materialized views should always have storage (root_page != 0) @@ -1002,7 +1002,7 @@ impl Program { let schema = self.connection.schema.read(); if let Some(view_mutex) = schema.get_materialized_view(view_name) { - let mut view = view_mutex.lock().unwrap(); + let mut view = view_mutex.lock(); // Create a DeltaSet from the per-table deltas let mut delta_set = crate::incremental::compiler::DeltaSet::new();