From 5922e140fdae81fe469c8618fda7665dc9bdeb3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Granh=C3=A3o?= <32176319+danielgranhao@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:41:16 +0100 Subject: [PATCH] Wasm: prevent cache corruption and gracefully handle it (#875) --- .../src/platform/browser/wallet_persister.rs | 132 +++++++++++++++--- lib/wasm/src/platform/mod.rs | 2 +- .../src/platform/wallet_persister_common.rs | 6 +- 3 files changed, 119 insertions(+), 21 deletions(-) diff --git a/lib/wasm/src/platform/browser/wallet_persister.rs b/lib/wasm/src/platform/browser/wallet_persister.rs index ee6d118..0ae24de 100644 --- a/lib/wasm/src/platform/browser/wallet_persister.rs +++ b/lib/wasm/src/platform/browser/wallet_persister.rs @@ -3,10 +3,12 @@ use anyhow::{anyhow, Context}; use breez_sdk_liquid::wallet::persister::lwk_wollet::{PersistError, Update, WolletDescriptor}; use breez_sdk_liquid::wallet::persister::{lwk_wollet, LwkPersister, WalletCachePersister}; use indexed_db_futures::database::Database; +use indexed_db_futures::iter::ArrayMapIter; +use indexed_db_futures::object_store::ObjectStore; use indexed_db_futures::query_source::QuerySource; use indexed_db_futures::transaction::TransactionMode; use indexed_db_futures::Build; -use log::info; +use log::{info, warn}; use std::path::Path; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -61,6 +63,7 @@ struct AsyncLwkPersister { impl AsyncLwkPersister { async fn new(storage: Arc) -> anyhow::Result { let updates = storage.load_updates().await?; + info!("Loaded {} updates from storage", updates.len()); let (sender, receiver) = tokio::sync::mpsc::channel(20); @@ -89,11 +92,11 @@ impl AsyncLwkPersister { } impl lwk_wollet::Persister for AsyncLwkPersister { - fn get(&self, index: usize) -> std::result::Result, PersistError> { + fn get(&self, index: usize) -> Result, PersistError> { Ok(self.updates.lock().unwrap().get(index).cloned()) } - fn push(&self, update: Update) -> std::result::Result<(), PersistError> { + fn push(&self, update: Update) -> Result<(), PersistError> { let mut updates = self.updates.lock().unwrap(); let (update, write_index) = maybe_merge_updates(update, updates.last(), updates.len()); @@ -140,18 +143,25 @@ impl AsyncWalletStorage for IndexedDbWalletStorage { .object_store(IDB_STORE_NAME) .map_err(|e| anyhow!("Failed to open object store: {}", e))?; - let updates_count = store - .count() + let updates_bytes: ArrayMapIter> = store + .get_all() .await - .map_err(|e| anyhow!("Failed to get next index: {}", e))?; + .map_err(|e| anyhow!("Failed to get all updates: {}", e))?; + + let max_index = get_max_index(&store).await?; + if let Some(max_index) = max_index { + if max_index != updates_bytes.len() as u32 - 1 { + warn!("Wallet cache updates in IndexedDb are not contiguous. The (length - 1) is {}, but the max index is {max_index}. \ + This means it got corrupted (likely by concurrent SDK instances). Clearing the cache.", updates_bytes.len() as u32 - 1); + self.clear().await?; + return Ok(Vec::new()); + } + } let mut updates = Vec::new(); - for i in 0..updates_count { - let update_bytes: Vec = store - .get(i) - .await - .map_err(|e| anyhow!("Failed to get update bytes: {}", e))? - .ok_or(anyhow!("Missing update on index {i}"))?; + for update_bytes_result in updates_bytes { + let update_bytes = + update_bytes_result.map_err(|e| anyhow!("Failed to get update bytes: {}", e))?; updates.push( Update::deserialize_decrypted(&update_bytes, &self.desc) .context("Failed to deserialize update")?, @@ -180,6 +190,18 @@ impl AsyncWalletStorage for IndexedDbWalletStorage { .object_store(IDB_STORE_NAME) .map_err(|e| anyhow!("Failed to open object store: {e}"))?; + let max_index = get_max_index(&store).await?; + + if let Some(max_index) = max_index { + if index > max_index + 1 { + return Err(anyhow!( + "Index {index} is greater than the maximum index {max_index} + 1" + )); + } + } else if index != 0 { + return Err(anyhow!("Index {index} is not 0")); + } + store .put(update_bytes) .with_key(index) @@ -210,15 +232,15 @@ impl AsyncWalletStorage for IndexedDbWalletStorage { .clear() .map_err(|e| anyhow!("Failed to clear object store: {}", e))?; - tx.commit().await.map_err(|e| { - lwk_wollet::PersistError::Other(format!("Failed to commit transaction: {}", e)) - })?; + tx.commit() + .await + .map_err(|e| PersistError::Other(format!("Failed to commit transaction: {}", e)))?; Ok(()) } } -pub(crate) async fn open_indexed_db(name: &str) -> Result { +pub(crate) async fn open_indexed_db(name: &str) -> Result { let db = Database::open(name) .with_version(1u32) .with_on_upgrade_needed(|event, db| { @@ -229,6 +251,82 @@ pub(crate) async fn open_indexed_db(name: &str) -> Result) -> Result, PersistError> { + let keys: ArrayMapIter = store.get_all_keys().await.map_err(|e| { + PersistError::Other(format!("Failed to get all keys from object store: {}", e)) + })?; + let keys = keys.filter_map(|k| k.ok()).collect::>(); + Ok(keys.into_iter().max()) +} + +#[cfg(test)] +mod tests { + use crate::platform::browser::wallet_persister::{ + open_indexed_db, AsyncWalletStorage, IndexedDbWalletStorage, IDB_STORE_NAME, + }; + use crate::platform::wallet_persister_common::tests::{get_lwk_update, get_wollet_descriptor}; + use anyhow::anyhow; + use indexed_db_futures::transaction::TransactionMode; + use indexed_db_futures::Build; + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[sdk_macros::async_test_wasm] + async fn test_load_updates() -> anyhow::Result<()> { + let desc = get_wollet_descriptor()?; + let storage = IndexedDbWalletStorage::new("test-load-updates".as_ref(), desc); + + // Non-contiguous keys are prevented (first index is 0) + assert!(storage + .persist_update(get_lwk_update(1, false), 1) + .await + .is_err()); + assert!(storage.load_updates().await?.is_empty()); + + storage.persist_update(get_lwk_update(5, false), 0).await?; + storage.persist_update(get_lwk_update(10, false), 1).await?; + + // Overwritting is allowed + storage.persist_update(get_lwk_update(15, false), 1).await?; + + let updates = storage.load_updates().await?; + assert_eq!(updates.len(), 2); + + // Non-contiguous keys are prevented + assert!(storage + .persist_update(get_lwk_update(10, false), 3) + .await + .is_err()); + + let updates = storage.load_updates().await?; + assert_eq!(updates.len(), 2); + + // If for any reason there is a gap, the cache is cleared on load + storage.persist_update(get_lwk_update(15, false), 2).await?; + delete_update_on_index(1, &storage.db_name).await; + let updates = storage.load_updates().await?; + assert_eq!(updates.len(), 0); + + Ok(()) + } + + async fn delete_update_on_index(index: u32, db_name: &str) { + let idb = open_indexed_db(db_name).await.unwrap(); + + let tx = idb + .transaction([IDB_STORE_NAME]) + .with_mode(TransactionMode::Readwrite) + .build() + .unwrap(); + + let store = tx.object_store(IDB_STORE_NAME).unwrap(); + + store.delete(index).await.unwrap(); + + tx.commit().await.unwrap(); + } +} diff --git a/lib/wasm/src/platform/mod.rs b/lib/wasm/src/platform/mod.rs index f232e74..006c8eb 100644 --- a/lib/wasm/src/platform/mod.rs +++ b/lib/wasm/src/platform/mod.rs @@ -25,4 +25,4 @@ pub(crate) use default::{create_db_backup_persister, create_onchain_wallet}; )] pub(crate) mod db_backup_common; #[cfg(any(feature = "browser", feature = "node-js"))] -mod wallet_persister_common; +pub(crate) mod wallet_persister_common; diff --git a/lib/wasm/src/platform/wallet_persister_common.rs b/lib/wasm/src/platform/wallet_persister_common.rs index ac8b35b..bce5c04 100644 --- a/lib/wasm/src/platform/wallet_persister_common.rs +++ b/lib/wasm/src/platform/wallet_persister_common.rs @@ -20,7 +20,7 @@ pub(crate) fn maybe_merge_updates( #[cfg(any(feature = "browser", feature = "node-js"))] #[cfg(test)] -mod tests { +pub(crate) mod tests { use crate::platform::create_wallet_persister; use breez_sdk_liquid::elements::hashes::Hash; use breez_sdk_liquid::elements::{BlockHash, BlockHeader, TxMerkleNode, Txid}; @@ -38,7 +38,7 @@ mod tests { #[cfg(feature = "browser")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); - fn get_wollet_descriptor() -> anyhow::Result { + pub(crate) fn get_wollet_descriptor() -> anyhow::Result { let signer: Rc> = Rc::new(Box::new(SdkSigner::new("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", "", false)?)); let sdk_lwk_signer = SdkLwkSigner::new(signer)?; Ok(get_descriptor(&sdk_lwk_signer, LiquidNetwork::Testnet)?) @@ -104,7 +104,7 @@ mod tests { Ok(()) } - fn get_lwk_update(height: u32, only_tip: bool) -> lwk_wollet::Update { + pub(crate) fn get_lwk_update(height: u32, only_tip: bool) -> lwk_wollet::Update { let txid_height_new = match only_tip { true => Vec::new(), false => {