mirror of
https://github.com/aljazceru/breez-sdk-liquid.git
synced 2026-01-28 02:14:30 +01:00
Wasm: prevent cache corruption and gracefully handle it (#875)
This commit is contained in:
@@ -3,10 +3,12 @@ use anyhow::{anyhow, Context};
|
||||
use breez_sdk_liquid::wallet::persister::lwk_wollet::{PersistError, Update, WolletDescriptor};
|
||||
use breez_sdk_liquid::wallet::persister::{lwk_wollet, LwkPersister, WalletCachePersister};
|
||||
use indexed_db_futures::database::Database;
|
||||
use indexed_db_futures::iter::ArrayMapIter;
|
||||
use indexed_db_futures::object_store::ObjectStore;
|
||||
use indexed_db_futures::query_source::QuerySource;
|
||||
use indexed_db_futures::transaction::TransactionMode;
|
||||
use indexed_db_futures::Build;
|
||||
use log::info;
|
||||
use log::{info, warn};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
@@ -61,6 +63,7 @@ struct AsyncLwkPersister<S: AsyncWalletStorage> {
|
||||
impl<S: AsyncWalletStorage> AsyncLwkPersister<S> {
|
||||
async fn new(storage: Arc<S>) -> anyhow::Result<Self> {
|
||||
let updates = storage.load_updates().await?;
|
||||
info!("Loaded {} updates from storage", updates.len());
|
||||
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(20);
|
||||
|
||||
@@ -89,11 +92,11 @@ impl<S: AsyncWalletStorage> AsyncLwkPersister<S> {
|
||||
}
|
||||
|
||||
impl<S: AsyncWalletStorage> lwk_wollet::Persister for AsyncLwkPersister<S> {
|
||||
fn get(&self, index: usize) -> std::result::Result<Option<Update>, PersistError> {
|
||||
fn get(&self, index: usize) -> Result<Option<Update>, PersistError> {
|
||||
Ok(self.updates.lock().unwrap().get(index).cloned())
|
||||
}
|
||||
|
||||
fn push(&self, update: Update) -> std::result::Result<(), PersistError> {
|
||||
fn push(&self, update: Update) -> Result<(), PersistError> {
|
||||
let mut updates = self.updates.lock().unwrap();
|
||||
|
||||
let (update, write_index) = maybe_merge_updates(update, updates.last(), updates.len());
|
||||
@@ -140,18 +143,25 @@ impl AsyncWalletStorage for IndexedDbWalletStorage {
|
||||
.object_store(IDB_STORE_NAME)
|
||||
.map_err(|e| anyhow!("Failed to open object store: {}", e))?;
|
||||
|
||||
let updates_count = store
|
||||
.count()
|
||||
let updates_bytes: ArrayMapIter<Vec<u8>> = store
|
||||
.get_all()
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to get next index: {}", e))?;
|
||||
.map_err(|e| anyhow!("Failed to get all updates: {}", e))?;
|
||||
|
||||
let max_index = get_max_index(&store).await?;
|
||||
if let Some(max_index) = max_index {
|
||||
if max_index != updates_bytes.len() as u32 - 1 {
|
||||
warn!("Wallet cache updates in IndexedDb are not contiguous. The (length - 1) is {}, but the max index is {max_index}. \
|
||||
This means it got corrupted (likely by concurrent SDK instances). Clearing the cache.", updates_bytes.len() as u32 - 1);
|
||||
self.clear().await?;
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
}
|
||||
|
||||
let mut updates = Vec::new();
|
||||
for i in 0..updates_count {
|
||||
let update_bytes: Vec<u8> = store
|
||||
.get(i)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Failed to get update bytes: {}", e))?
|
||||
.ok_or(anyhow!("Missing update on index {i}"))?;
|
||||
for update_bytes_result in updates_bytes {
|
||||
let update_bytes =
|
||||
update_bytes_result.map_err(|e| anyhow!("Failed to get update bytes: {}", e))?;
|
||||
updates.push(
|
||||
Update::deserialize_decrypted(&update_bytes, &self.desc)
|
||||
.context("Failed to deserialize update")?,
|
||||
@@ -180,6 +190,18 @@ impl AsyncWalletStorage for IndexedDbWalletStorage {
|
||||
.object_store(IDB_STORE_NAME)
|
||||
.map_err(|e| anyhow!("Failed to open object store: {e}"))?;
|
||||
|
||||
let max_index = get_max_index(&store).await?;
|
||||
|
||||
if let Some(max_index) = max_index {
|
||||
if index > max_index + 1 {
|
||||
return Err(anyhow!(
|
||||
"Index {index} is greater than the maximum index {max_index} + 1"
|
||||
));
|
||||
}
|
||||
} else if index != 0 {
|
||||
return Err(anyhow!("Index {index} is not 0"));
|
||||
}
|
||||
|
||||
store
|
||||
.put(update_bytes)
|
||||
.with_key(index)
|
||||
@@ -210,15 +232,15 @@ impl AsyncWalletStorage for IndexedDbWalletStorage {
|
||||
.clear()
|
||||
.map_err(|e| anyhow!("Failed to clear object store: {}", e))?;
|
||||
|
||||
tx.commit().await.map_err(|e| {
|
||||
lwk_wollet::PersistError::Other(format!("Failed to commit transaction: {}", e))
|
||||
})?;
|
||||
tx.commit()
|
||||
.await
|
||||
.map_err(|e| PersistError::Other(format!("Failed to commit transaction: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn open_indexed_db(name: &str) -> Result<Database, lwk_wollet::PersistError> {
|
||||
pub(crate) async fn open_indexed_db(name: &str) -> Result<Database, PersistError> {
|
||||
let db = Database::open(name)
|
||||
.with_version(1u32)
|
||||
.with_on_upgrade_needed(|event, db| {
|
||||
@@ -229,6 +251,82 @@ pub(crate) async fn open_indexed_db(name: &str) -> Result<Database, lwk_wollet::
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(|e| lwk_wollet::PersistError::Other(format!("Failed to open IndexedDB: {}", e)))?;
|
||||
.map_err(|e| PersistError::Other(format!("Failed to open IndexedDB: {}", e)))?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
async fn get_max_index(store: &ObjectStore<'_>) -> Result<Option<u32>, PersistError> {
|
||||
let keys: ArrayMapIter<u32> = store.get_all_keys().await.map_err(|e| {
|
||||
PersistError::Other(format!("Failed to get all keys from object store: {}", e))
|
||||
})?;
|
||||
let keys = keys.filter_map(|k| k.ok()).collect::<Vec<_>>();
|
||||
Ok(keys.into_iter().max())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::platform::browser::wallet_persister::{
|
||||
open_indexed_db, AsyncWalletStorage, IndexedDbWalletStorage, IDB_STORE_NAME,
|
||||
};
|
||||
use crate::platform::wallet_persister_common::tests::{get_lwk_update, get_wollet_descriptor};
|
||||
use anyhow::anyhow;
|
||||
use indexed_db_futures::transaction::TransactionMode;
|
||||
use indexed_db_futures::Build;
|
||||
|
||||
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
#[sdk_macros::async_test_wasm]
|
||||
async fn test_load_updates() -> anyhow::Result<()> {
|
||||
let desc = get_wollet_descriptor()?;
|
||||
let storage = IndexedDbWalletStorage::new("test-load-updates".as_ref(), desc);
|
||||
|
||||
// Non-contiguous keys are prevented (first index is 0)
|
||||
assert!(storage
|
||||
.persist_update(get_lwk_update(1, false), 1)
|
||||
.await
|
||||
.is_err());
|
||||
assert!(storage.load_updates().await?.is_empty());
|
||||
|
||||
storage.persist_update(get_lwk_update(5, false), 0).await?;
|
||||
storage.persist_update(get_lwk_update(10, false), 1).await?;
|
||||
|
||||
// Overwritting is allowed
|
||||
storage.persist_update(get_lwk_update(15, false), 1).await?;
|
||||
|
||||
let updates = storage.load_updates().await?;
|
||||
assert_eq!(updates.len(), 2);
|
||||
|
||||
// Non-contiguous keys are prevented
|
||||
assert!(storage
|
||||
.persist_update(get_lwk_update(10, false), 3)
|
||||
.await
|
||||
.is_err());
|
||||
|
||||
let updates = storage.load_updates().await?;
|
||||
assert_eq!(updates.len(), 2);
|
||||
|
||||
// If for any reason there is a gap, the cache is cleared on load
|
||||
storage.persist_update(get_lwk_update(15, false), 2).await?;
|
||||
delete_update_on_index(1, &storage.db_name).await;
|
||||
let updates = storage.load_updates().await?;
|
||||
assert_eq!(updates.len(), 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_update_on_index(index: u32, db_name: &str) {
|
||||
let idb = open_indexed_db(db_name).await.unwrap();
|
||||
|
||||
let tx = idb
|
||||
.transaction([IDB_STORE_NAME])
|
||||
.with_mode(TransactionMode::Readwrite)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let store = tx.object_store(IDB_STORE_NAME).unwrap();
|
||||
|
||||
store.delete(index).await.unwrap();
|
||||
|
||||
tx.commit().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,4 +25,4 @@ pub(crate) use default::{create_db_backup_persister, create_onchain_wallet};
|
||||
)]
|
||||
pub(crate) mod db_backup_common;
|
||||
#[cfg(any(feature = "browser", feature = "node-js"))]
|
||||
mod wallet_persister_common;
|
||||
pub(crate) mod wallet_persister_common;
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(crate) fn maybe_merge_updates(
|
||||
|
||||
#[cfg(any(feature = "browser", feature = "node-js"))]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub(crate) mod tests {
|
||||
use crate::platform::create_wallet_persister;
|
||||
use breez_sdk_liquid::elements::hashes::Hash;
|
||||
use breez_sdk_liquid::elements::{BlockHash, BlockHeader, TxMerkleNode, Txid};
|
||||
@@ -38,7 +38,7 @@ mod tests {
|
||||
#[cfg(feature = "browser")]
|
||||
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
fn get_wollet_descriptor() -> anyhow::Result<WolletDescriptor> {
|
||||
pub(crate) fn get_wollet_descriptor() -> anyhow::Result<WolletDescriptor> {
|
||||
let signer: Rc<Box<dyn Signer>> = Rc::new(Box::new(SdkSigner::new("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", "", false)?));
|
||||
let sdk_lwk_signer = SdkLwkSigner::new(signer)?;
|
||||
Ok(get_descriptor(&sdk_lwk_signer, LiquidNetwork::Testnet)?)
|
||||
@@ -104,7 +104,7 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_lwk_update(height: u32, only_tip: bool) -> lwk_wollet::Update {
|
||||
pub(crate) fn get_lwk_update(height: u32, only_tip: bool) -> lwk_wollet::Update {
|
||||
let txid_height_new = match only_tip {
|
||||
true => Vec::new(),
|
||||
false => {
|
||||
|
||||
Reference in New Issue
Block a user