Make schema a RWLock

This makes it work like in SQLite where only one schema writer is permitted and readers will return error while preparing statement if the schema is changing.
This commit is contained in:
Pere Diaz Bou
2025-03-04 16:57:54 +01:00
parent e4a8ee5402
commit e20dd59353
3 changed files with 17 additions and 17 deletions

View File

@@ -53,6 +53,8 @@ pub enum LimboError {
Unbound(NonZero<usize>),
#[error("Runtime error: integer overflow")]
IntegerOverflow,
#[error("Schema is locked for write")]
SchemaLocked,
}
#[macro_export]

View File

@@ -85,7 +85,7 @@ enum TransactionState {
pub struct Database {
// TODO: make schema work without lock
schema: Arc<Mutex<Schema>>,
schema: Arc<RwLock<Schema>>,
// TODO: make header work without lock
header: Arc<Mutex<DatabaseHeader>>,
page_io: Arc<dyn DatabaseStorage>,
@@ -131,7 +131,7 @@ impl Database {
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10)));
let page_size = db_header.lock().unwrap().page_size;
let header = db_header;
let schema = Arc::new(Mutex::new(Schema::new()));
let schema = Arc::new(RwLock::new(Schema::new()));
let db = Database {
schema: schema.clone(),
header: header.clone(),
@@ -146,7 +146,7 @@ impl Database {
// parse schema
let conn = db.connect()?;
let rows = conn.query("SELECT * FROM sqlite_schema")?;
let mut schema = schema.lock().unwrap();
let mut schema = schema.try_write().expect("lock on schema should succeed first try");
let syms = conn.syms.borrow();
parse_schema_rows(rows, &mut schema, io, syms.deref())?;
}
@@ -240,7 +240,7 @@ pub fn maybe_init_database_file(file: &Arc<dyn File>, io: &Arc<dyn IO>) -> Resul
pub struct Connection {
db: Arc<Database>,
pager: Rc<Pager>,
schema: Arc<Mutex<Schema>>,
schema: Arc<RwLock<Schema>>,
header: Arc<Mutex<DatabaseHeader>>,
auto_commit: RefCell<bool>,
transaction_state: RefCell<TransactionState>,
@@ -257,11 +257,12 @@ impl Connection {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
let syms = self.syms.borrow();
let schema = self.schema.try_read().ok_or(LimboError::SchemaLocked)?;
if let Some(cmd) = cmd {
match cmd {
Cmd::Stmt(stmt) => {
let program = Rc::new(translate::translate(
&self.schema.lock().unwrap(),
&schema,
stmt,
self.header.clone(),
self.pager.clone(),
@@ -291,11 +292,12 @@ impl Connection {
}
pub(crate) fn run_cmd(self: &Rc<Connection>, cmd: Cmd) -> Result<Option<Statement>> {
let schema = self.schema.try_read().ok_or(LimboError::SchemaLocked)?;
let syms = self.syms.borrow();
match cmd {
Cmd::Stmt(stmt) => {
let program = Rc::new(translate::translate(
&self.schema.lock().unwrap(),
schema.deref(),
stmt,
self.header.clone(),
self.pager.clone(),
@@ -308,7 +310,7 @@ impl Connection {
}
Cmd::Explain(stmt) => {
let program = translate::translate(
&self.schema.lock().unwrap(),
schema.deref(),
stmt,
self.header.clone(),
self.pager.clone(),
@@ -322,13 +324,8 @@ impl Connection {
Cmd::ExplainQueryPlan(stmt) => {
match stmt {
ast::Stmt::Select(select) => {
let mut plan = prepare_select_plan(
&self.schema.lock().unwrap(),
*select,
&syms,
None,
)?;
optimize_plan(&mut plan, &self.schema.lock().unwrap())?;
let mut plan = prepare_select_plan(schema.deref(), *select, &syms, None)?;
optimize_plan(&mut plan, schema.deref())?;
println!("{}", plan);
}
_ => todo!(),
@@ -347,11 +344,12 @@ impl Connection {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next()?;
let syms = self.syms.borrow();
let schema = self.schema.try_read().ok_or(LimboError::SchemaLocked)?;
if let Some(cmd) = cmd {
match cmd {
Cmd::Explain(stmt) => {
let program = translate::translate(
&self.schema.lock().unwrap(),
&schema,
stmt,
self.header.clone(),
self.pager.clone(),
@@ -364,7 +362,7 @@ impl Connection {
Cmd::ExplainQueryPlan(_stmt) => todo!(),
Cmd::Stmt(stmt) => {
let program = translate::translate(
&self.schema.lock().unwrap(),
&schema,
stmt,
self.header.clone(),
self.pager.clone(),

View File

@@ -2998,7 +2998,7 @@ impl Program {
"SELECT * FROM sqlite_schema WHERE {}",
where_clause
))?;
let mut schema = conn.schema.lock().unwrap();
let mut schema = conn.schema.write();
// TODO: This function below is synchronous, make it async
parse_schema_rows(
Some(stmt),