mirror of
https://github.com/aljazceru/cdk.git
synced 2025-12-18 13:14:59 +01:00
fix: implement atomic keyset counter
- remove get_keyset_counter - update increment_keyset_counter to atomically increment and return counter value - replace get+increment pattern with atomic increment everywhere
This commit is contained in:
@@ -839,42 +839,44 @@ ON CONFLICT(id) DO UPDATE SET
|
||||
}
|
||||
|
||||
#[instrument(skip(self), fields(keyset_id = %keyset_id))]
|
||||
async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<(), Self::Err> {
|
||||
async fn increment_keyset_counter(&self, keyset_id: &Id, count: u32) -> Result<u32, Self::Err> {
|
||||
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
|
||||
query(
|
||||
let tx = ConnectionWithTransaction::new(conn).await?;
|
||||
|
||||
// Lock the row and get current counter
|
||||
let current_counter = query(
|
||||
r#"
|
||||
UPDATE keyset
|
||||
SET counter=counter+:count
|
||||
SELECT counter
|
||||
FROM keyset
|
||||
WHERE id=:id
|
||||
"#,
|
||||
)?
|
||||
.bind("count", count)
|
||||
.bind("id", keyset_id.to_string())
|
||||
.execute(&*conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self), fields(keyset_id = %keyset_id))]
|
||||
async fn get_keyset_counter(&self, keyset_id: &Id) -> Result<u32, Self::Err> {
|
||||
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
|
||||
Ok(query(
|
||||
r#"
|
||||
SELECT
|
||||
counter
|
||||
FROM
|
||||
keyset
|
||||
WHERE
|
||||
id=:id
|
||||
FOR UPDATE
|
||||
"#,
|
||||
)?
|
||||
.bind("id", keyset_id.to_string())
|
||||
.pluck(&*conn)
|
||||
.pluck(&tx)
|
||||
.await?
|
||||
.map(|n| Ok::<_, Error>(column_as_number!(n)))
|
||||
.transpose()?
|
||||
.unwrap_or(0))
|
||||
.unwrap_or(0);
|
||||
|
||||
let new_counter = current_counter + count;
|
||||
|
||||
// Update with the new counter value
|
||||
query(
|
||||
r#"
|
||||
UPDATE keyset
|
||||
SET counter=:new_counter
|
||||
WHERE id=:id
|
||||
"#,
|
||||
)?
|
||||
.bind("new_counter", new_counter)
|
||||
.bind("id", keyset_id.to_string())
|
||||
.execute(&tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(new_counter)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
|
||||
Reference in New Issue
Block a user