mirror of
https://github.com/aljazceru/cdk.git
synced 2025-12-20 06:05:09 +01:00
feat(cdk): add generic key-value store functionality for mint databases (#1022)
* feat(cdk): add generic key-value store functionality for mint databases Implements a comprehensive KV store system with transaction support, namespace-based organization, and validation for mint databases. - Add KVStoreDatabase and KVStoreTransaction traits for generic storage - Include namespace and key validation with ASCII character restrictions - Add database migrations for kv_store table in SQLite and PostgreSQL - Implement comprehensive test suite for KV store functionality - Integrate KV store traits into existing Database and Transaction bounds
This commit is contained in:
@@ -16,6 +16,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use bitcoin::bip32::DerivationPath;
|
||||
use cdk_common::common::QuoteTTL;
|
||||
use cdk_common::database::mint::validate_kvstore_params;
|
||||
use cdk_common::database::{
|
||||
self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
|
||||
MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
|
||||
@@ -1582,6 +1583,224 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<RM> database::MintKVStoreTransaction<'_, Error> for SQLTransaction<RM>
|
||||
where
|
||||
RM: DatabasePool + 'static,
|
||||
{
|
||||
async fn kv_read(
|
||||
&mut self,
|
||||
primary_namespace: &str,
|
||||
secondary_namespace: &str,
|
||||
key: &str,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
// Validate parameters according to KV store requirements
|
||||
validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
|
||||
Ok(query(
|
||||
r#"
|
||||
SELECT value
|
||||
FROM kv_store
|
||||
WHERE primary_namespace = :primary_namespace
|
||||
AND secondary_namespace = :secondary_namespace
|
||||
AND key = :key
|
||||
"#,
|
||||
)?
|
||||
.bind("primary_namespace", primary_namespace.to_owned())
|
||||
.bind("secondary_namespace", secondary_namespace.to_owned())
|
||||
.bind("key", key.to_owned())
|
||||
.pluck(&self.inner)
|
||||
.await?
|
||||
.and_then(|col| match col {
|
||||
Column::Blob(data) => Some(data),
|
||||
_ => None,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn kv_write(
|
||||
&mut self,
|
||||
primary_namespace: &str,
|
||||
secondary_namespace: &str,
|
||||
key: &str,
|
||||
value: &[u8],
|
||||
) -> Result<(), Error> {
|
||||
// Validate parameters according to KV store requirements
|
||||
validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
|
||||
|
||||
let current_time = unix_time();
|
||||
|
||||
query(
|
||||
r#"
|
||||
INSERT INTO kv_store
|
||||
(primary_namespace, secondary_namespace, key, value, created_time, updated_time)
|
||||
VALUES (:primary_namespace, :secondary_namespace, :key, :value, :created_time, :updated_time)
|
||||
ON CONFLICT(primary_namespace, secondary_namespace, key)
|
||||
DO UPDATE SET
|
||||
value = excluded.value,
|
||||
updated_time = excluded.updated_time
|
||||
"#,
|
||||
)?
|
||||
.bind("primary_namespace", primary_namespace.to_owned())
|
||||
.bind("secondary_namespace", secondary_namespace.to_owned())
|
||||
.bind("key", key.to_owned())
|
||||
.bind("value", value.to_vec())
|
||||
.bind("created_time", current_time as i64)
|
||||
.bind("updated_time", current_time as i64)
|
||||
.execute(&self.inner)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn kv_remove(
|
||||
&mut self,
|
||||
primary_namespace: &str,
|
||||
secondary_namespace: &str,
|
||||
key: &str,
|
||||
) -> Result<(), Error> {
|
||||
// Validate parameters according to KV store requirements
|
||||
validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
|
||||
query(
|
||||
r#"
|
||||
DELETE FROM kv_store
|
||||
WHERE primary_namespace = :primary_namespace
|
||||
AND secondary_namespace = :secondary_namespace
|
||||
AND key = :key
|
||||
"#,
|
||||
)?
|
||||
.bind("primary_namespace", primary_namespace.to_owned())
|
||||
.bind("secondary_namespace", secondary_namespace.to_owned())
|
||||
.bind("key", key.to_owned())
|
||||
.execute(&self.inner)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn kv_list(
|
||||
&mut self,
|
||||
primary_namespace: &str,
|
||||
secondary_namespace: &str,
|
||||
) -> Result<Vec<String>, Error> {
|
||||
// Validate namespace parameters according to KV store requirements
|
||||
cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
|
||||
cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
|
||||
|
||||
// Check empty namespace rules
|
||||
if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
|
||||
return Err(Error::KVStoreInvalidKey(
|
||||
"If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(query(
|
||||
r#"
|
||||
SELECT key
|
||||
FROM kv_store
|
||||
WHERE primary_namespace = :primary_namespace
|
||||
AND secondary_namespace = :secondary_namespace
|
||||
ORDER BY key
|
||||
"#,
|
||||
)?
|
||||
.bind("primary_namespace", primary_namespace.to_owned())
|
||||
.bind("secondary_namespace", secondary_namespace.to_owned())
|
||||
.fetch_all(&self.inner)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| Ok(column_as_string!(&row[0])))
|
||||
.collect::<Result<Vec<_>, Error>>()?)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<RM> database::MintKVStoreDatabase for SQLMintDatabase<RM>
|
||||
where
|
||||
RM: DatabasePool + 'static,
|
||||
{
|
||||
type Err = Error;
|
||||
|
||||
async fn kv_read(
|
||||
&self,
|
||||
primary_namespace: &str,
|
||||
secondary_namespace: &str,
|
||||
key: &str,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
// Validate parameters according to KV store requirements
|
||||
validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
|
||||
|
||||
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
|
||||
Ok(query(
|
||||
r#"
|
||||
SELECT value
|
||||
FROM kv_store
|
||||
WHERE primary_namespace = :primary_namespace
|
||||
AND secondary_namespace = :secondary_namespace
|
||||
AND key = :key
|
||||
"#,
|
||||
)?
|
||||
.bind("primary_namespace", primary_namespace.to_owned())
|
||||
.bind("secondary_namespace", secondary_namespace.to_owned())
|
||||
.bind("key", key.to_owned())
|
||||
.pluck(&*conn)
|
||||
.await?
|
||||
.and_then(|col| match col {
|
||||
Column::Blob(data) => Some(data),
|
||||
_ => None,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn kv_list(
|
||||
&self,
|
||||
primary_namespace: &str,
|
||||
secondary_namespace: &str,
|
||||
) -> Result<Vec<String>, Error> {
|
||||
// Validate namespace parameters according to KV store requirements
|
||||
cdk_common::database::mint::validate_kvstore_string(primary_namespace)?;
|
||||
cdk_common::database::mint::validate_kvstore_string(secondary_namespace)?;
|
||||
|
||||
// Check empty namespace rules
|
||||
if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
|
||||
return Err(Error::KVStoreInvalidKey(
|
||||
"If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
|
||||
Ok(query(
|
||||
r#"
|
||||
SELECT key
|
||||
FROM kv_store
|
||||
WHERE primary_namespace = :primary_namespace
|
||||
AND secondary_namespace = :secondary_namespace
|
||||
ORDER BY key
|
||||
"#,
|
||||
)?
|
||||
.bind("primary_namespace", primary_namespace.to_owned())
|
||||
.bind("secondary_namespace", secondary_namespace.to_owned())
|
||||
.fetch_all(&*conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| Ok(column_as_string!(&row[0])))
|
||||
.collect::<Result<Vec<_>, Error>>()?)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<RM> database::MintKVStore for SQLMintDatabase<RM>
|
||||
where
|
||||
RM: DatabasePool + 'static,
|
||||
{
|
||||
async fn begin_transaction<'a>(
|
||||
&'a self,
|
||||
) -> Result<Box<dyn database::MintKVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, Error>
|
||||
{
|
||||
Ok(Box::new(SQLTransaction {
|
||||
inner: ConnectionWithTransaction::new(
|
||||
self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
|
||||
)
|
||||
.await?,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
|
||||
where
|
||||
|
||||
Reference in New Issue
Block a user