mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-10 02:34:20 +01:00
add methods for read/write schema cookie
This commit is contained in:
47
core/lib.rs
47
core/lib.rs
@@ -1218,6 +1218,50 @@ impl Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn read_schema_version(&self) -> Result<u32> {
|
||||
loop {
|
||||
let pager = self.pager.borrow();
|
||||
match pager.with_header(|header| header.schema_cookie)? {
|
||||
IOResult::Done(cookie) => return Ok(cookie.get()),
|
||||
IOResult::IO => {
|
||||
self.run_once()?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
pub fn write_schema_version(self: &Arc<Connection>, version: u32) -> Result<()> {
|
||||
let TransactionState::Write { .. } = self.transaction_state.get() else {
|
||||
return Err(LimboError::InternalError(
|
||||
"write_schema_version must be called from within Write transaction".to_string(),
|
||||
));
|
||||
};
|
||||
loop {
|
||||
let pager = self.pager.borrow();
|
||||
match pager.with_header_mut(|header| {
|
||||
turso_assert!(
|
||||
header.schema_cookie.get() < version,
|
||||
"cookie can't go back in time"
|
||||
);
|
||||
self.transaction_state.replace(TransactionState::Write {
|
||||
schema_did_change: true,
|
||||
});
|
||||
self.with_schema_mut(|schema| schema.schema_version = version);
|
||||
header.schema_cookie = version.into();
|
||||
})? {
|
||||
IOResult::Done(()) => break,
|
||||
IOResult::IO => {
|
||||
self.run_once()?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to read page with given ID with fixed WAL watermark position
|
||||
/// This method return false if page is not found (so, this is probably new page created after watermark position which wasn't checkpointed to the DB file yet)
|
||||
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
|
||||
@@ -1917,11 +1961,12 @@ impl Statement {
|
||||
let mut res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
for _ in 0..MAX_SCHEMA_RETRY {
|
||||
for attempt in 0..MAX_SCHEMA_RETRY {
|
||||
// Only reprepare if we still need to update schema
|
||||
if !matches!(res, Err(LimboError::SchemaUpdated)) {
|
||||
break;
|
||||
}
|
||||
tracing::debug!("reprepare: attempt={}", attempt);
|
||||
self.reprepare()?;
|
||||
res = self
|
||||
.program
|
||||
|
||||
@@ -849,6 +849,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
|
||||
// Snapshot is stale, give up and let caller retry from scratch
|
||||
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch");
|
||||
shared.write_lock.unlock();
|
||||
Ok(LimboResult::Busy)
|
||||
}
|
||||
|
||||
@@ -2062,6 +2062,11 @@ pub fn op_transaction(
|
||||
match res {
|
||||
Ok(header_schema_cookie) => {
|
||||
if header_schema_cookie != *schema_cookie {
|
||||
tracing::info!(
|
||||
"schema changed, force reprepare: {} != {}",
|
||||
header_schema_cookie,
|
||||
*schema_cookie
|
||||
);
|
||||
return Err(LimboError::SchemaUpdated);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user