From 95bbd0e49b2587be4b00a2a7527e4ed0aaa4be1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Granh=C3=A3o?= <32176319+danielgranhao@users.noreply.github.com> Date: Sun, 12 Jan 2025 22:44:15 +0000 Subject: [PATCH] Implement optimistic locking for read-update-write swap operations (#652) * Implement optimistic locking for read-update-write swap operations * Fail on stale swap update and add tests --- lib/core/src/model.rs | 27 ++++++++++++++ lib/core/src/persist/chain.rs | 52 ++++++++++++++++++++++++--- lib/core/src/persist/migrations.rs | 23 ++++++++++++ lib/core/src/persist/receive.rs | 44 ++++++++++++++++++++--- lib/core/src/persist/send.rs | 45 +++++++++++++++++++---- lib/core/src/sdk.rs | 4 +++ lib/core/src/sync/mod.rs | 27 ++++++++------ lib/core/src/sync/model/data.rs | 3 ++ lib/core/src/test_utils/chain_swap.rs | 3 ++ lib/core/src/test_utils/persist.rs | 2 ++ 10 files changed, 205 insertions(+), 25 deletions(-) diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index e89203b..6687476 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -670,6 +670,27 @@ impl Swap { | Swap::Receive(ReceiveSwap { id, .. }) => id.clone(), } } + + pub(crate) fn version(&self) -> u64 { + match self { + Swap::Chain(ChainSwap { version, .. }) + | Swap::Send(SendSwap { version, .. }) + | Swap::Receive(ReceiveSwap { version, .. }) => *version, + } + } + pub(crate) fn set_version(&mut self, version: u64) { + match self { + Swap::Chain(chain_swap) => { + chain_swap.version = version; + } + Swap::Send(send_swap) => { + send_swap.version = version; + } + Swap::Receive(receive_swap) => { + receive_swap.version = version; + } + } + } } impl From for Swap { fn from(swap: ChainSwap) -> Self { @@ -771,6 +792,8 @@ pub(crate) struct ChainSwap { pub(crate) state: PaymentState, pub(crate) claim_private_key: String, pub(crate) refund_private_key: String, + /// Version used for optimistic concurrency control within local db + pub(crate) version: u64, } impl ChainSwap { pub(crate) fn get_claim_keypair(&self) -> SdkResult { @@ -927,6 +950,8 @@ pub(crate) struct SendSwap { pub(crate) timeout_block_height: u64, pub(crate) state: PaymentState, pub(crate) refund_private_key: String, + /// Version used for optimistic concurrency control within local db + pub(crate) version: u64, } impl SendSwap { pub(crate) fn get_refund_keypair(&self) -> Result { @@ -1021,6 +1046,8 @@ pub(crate) struct ReceiveSwap { pub(crate) created_at: u32, pub(crate) timeout_block_height: u32, pub(crate) state: PaymentState, + /// Version used for optimistic concurrency control within local db + pub(crate) version: u64, } impl ReceiveSwap { pub(crate) fn get_claim_keypair(&self) -> Result { diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index 529c9fe..a9aa82f 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use boltz_client::swaps::boltz::{ChainSwapDetails, CreateChainResponse}; use rusqlite::{named_params, params, Connection, Row, TransactionBehavior}; use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash}; @@ -59,7 +59,7 @@ impl Persister { ), )?; - con.execute( + let rows_affected = con.execute( "UPDATE chain_swaps SET description = :description, @@ -72,9 +72,10 @@ impl Persister { pair_fees_json = :pair_fees_json, state = :state, actual_payer_amount_sat = :actual_payer_amount_sat, - accepted_receiver_amount_sat = COALESCE(accepted_receiver_amount_sat, :accepted_receiver_amount_sat) + accepted_receiver_amount_sat = :accepted_receiver_amount_sat WHERE - id = :id", + id = :id AND + version = :version", named_params! { ":id": &chain_swap.id, ":description": &chain_swap.description, @@ -88,8 +89,13 @@ impl Persister { ":state": &chain_swap.state, ":actual_payer_amount_sat": &chain_swap.actual_payer_amount_sat, ":accepted_receiver_amount_sat": &chain_swap.accepted_receiver_amount_sat, + ":version": &chain_swap.version, }, )?; + ensure_sdk!( + rows_affected > 0, + anyhow!("Version mismatch for chain swap {}", chain_swap.id) + ); Ok(()) } @@ -153,7 +159,8 @@ impl Persister { state, pair_fees_json, actual_payer_amount_sat, - accepted_receiver_amount_sat + accepted_receiver_amount_sat, + version FROM chain_swaps {where_clause_str} ORDER BY created_at @@ -205,6 +212,7 @@ impl Persister { pair_fees_json: row.get(20)?, actual_payer_amount_sat: row.get(21)?, accepted_receiver_amount_sat: row.get(22)?, + version: row.get(23)?, }) } @@ -487,3 +495,37 @@ impl InternalCreateChainResponse { Ok(res) } } + +#[cfg(test)] +mod tests { + use crate::model::Direction; + use crate::test_utils::chain_swap::new_chain_swap; + use crate::test_utils::persist::create_persister; + use anyhow::Result; + + #[tokio::test] + async fn test_writing_stale_swap() -> Result<()> { + create_persister!(storage); + + let chain_swap = new_chain_swap(Direction::Incoming, None, false, None, false); + storage.insert_or_update_chain_swap(&chain_swap)?; + + // read - update - write works if there are no updates in between + let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap(); + chain_swap.claim_tx_id = Some("tx_id".to_string()); + storage.insert_or_update_chain_swap(&chain_swap)?; + + // read - update - write works if there are no updates in between even if no field changes + let chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap(); + storage.insert_or_update_chain_swap(&chain_swap)?; + + // read - update - write fails if there are any updates in between + let mut chain_swap = storage.fetch_chain_swap_by_id(&chain_swap.id)?.unwrap(); + chain_swap.claim_tx_id = Some("tx_id_2".to_string()); + // Concurrent update + storage.update_chain_swap_accept_zero_conf(&chain_swap.id, true)?; + assert!(storage.insert_or_update_chain_swap(&chain_swap).is_err()); + + Ok(()) + } +} diff --git a/lib/core/src/persist/migrations.rs b/lib/core/src/persist/migrations.rs index cac369e..4884c42 100644 --- a/lib/core/src/persist/migrations.rs +++ b/lib/core/src/persist/migrations.rs @@ -225,5 +225,28 @@ pub(crate) fn current_migrations() -> Vec<&'static str> { ALTER TABLE receive_swaps ADD COLUMN destination_pubkey TEXT; ALTER TABLE send_swaps ADD COLUMN destination_pubkey TEXT; ", + " + ALTER TABLE receive_swaps ADD COLUMN version INTEGER NOT NULL DEFAULT 0; + ALTER TABLE send_swaps ADD COLUMN version INTEGER NOT NULL DEFAULT 0; + ALTER TABLE chain_swaps ADD COLUMN version INTEGER NOT NULL DEFAULT 0; + CREATE TRIGGER IF NOT EXISTS update_receive_swaps_version + AFTER UPDATE ON receive_swaps + BEGIN + UPDATE receive_swaps SET version = version + 1 + WHERE id = NEW.id; + END; + CREATE TRIGGER IF NOT EXISTS update_send_swaps_version + AFTER UPDATE ON send_swaps + BEGIN + UPDATE send_swaps SET version = version + 1 + WHERE id = NEW.id; + END; + CREATE TRIGGER IF NOT EXISTS update_chain_swaps_version + AFTER UPDATE ON chain_swaps + BEGIN + UPDATE chain_swaps SET version = version + 1 + WHERE id = NEW.id; + END; + ", ] } diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index 6dab84b..b28e221 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use boltz_client::swaps::boltz::CreateReverseResponse; use rusqlite::{named_params, params, Connection, Row, TransactionBehavior}; use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash}; @@ -60,7 +60,7 @@ impl Persister { ), )?; - con.execute( + let rows_affected = con.execute( "UPDATE receive_swaps SET description = :description, @@ -69,7 +69,8 @@ impl Persister { mrh_tx_id = :mrh_tx_id, state = :state WHERE - id = :id", + id = :id AND + version = :version", named_params! { ":id": &receive_swap.id, ":description": &receive_swap.description, @@ -77,8 +78,13 @@ impl Persister { ":lockup_tx_id": &receive_swap.lockup_tx_id, ":mrh_tx_id": &receive_swap.mrh_tx_id, ":state": &receive_swap.state, + ":version": &receive_swap.version, }, )?; + ensure_sdk!( + rows_affected > 0, + anyhow!("Version mismatch for receive swap {}", receive_swap.id) + ); if receive_swap.mrh_tx_id.is_some() { Self::delete_reserved_address_inner(con, &receive_swap.mrh_address)?; @@ -142,7 +148,8 @@ impl Persister { rs.mrh_tx_id, rs.created_at, rs.state, - rs.pair_fees_json + rs.pair_fees_json, + rs.version FROM receive_swaps AS rs {where_clause_str} ORDER BY rs.created_at @@ -190,6 +197,7 @@ impl Persister { created_at: row.get(16)?, state: row.get(17)?, pair_fees_json: row.get(18)?, + version: row.get(19)?, }) } @@ -446,4 +454,32 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_writing_stale_swap() -> Result<()> { + create_persister!(storage); + + let receive_swap = new_receive_swap(None); + storage.insert_or_update_receive_swap(&receive_swap)?; + + // read - update - write works if there are no updates in between + let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap(); + receive_swap.lockup_tx_id = Some("tx_id".to_string()); + storage.insert_or_update_receive_swap(&receive_swap)?; + + // read - update - write works if there are no updates in between even if no field changes + let receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap(); + storage.insert_or_update_receive_swap(&receive_swap)?; + + // read - update - write fails if there are any updates in between + let mut receive_swap = storage.fetch_receive_swap_by_id(&receive_swap.id)?.unwrap(); + receive_swap.lockup_tx_id = Some("tx_id_2".to_string()); + // Concurrent update + storage.set_receive_swap_claim_tx_id(&receive_swap.id, "tx_id")?; + assert!(storage + .insert_or_update_receive_swap(&receive_swap) + .is_err()); + + Ok(()) + } } diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index dd29fbf..8779d29 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use boltz_client::swaps::boltz::CreateSubmarineResponse; use rusqlite::{named_params, params, Connection, Row}; use sdk_common::bitcoin::hashes::{hex::ToHex, sha256, Hash}; @@ -56,7 +56,7 @@ impl Persister { ), )?; - con.execute( + let rows_affected = con.execute( "UPDATE send_swaps SET description = :description, @@ -65,7 +65,8 @@ impl Persister { refund_tx_id = :refund_tx_id, state = :state WHERE - id = :id", + id = :id AND + version = :version", named_params! { ":id": &send_swap.id, ":description": &send_swap.description, @@ -73,8 +74,13 @@ impl Persister { ":lockup_tx_id": &send_swap.lockup_tx_id, ":refund_tx_id": &send_swap.refund_tx_id, ":state": &send_swap.state, + ":version": &send_swap.version, }, )?; + ensure_sdk!( + rows_affected > 0, + anyhow!("Version mismatch for send swap {}", send_swap.id) + ); Ok(()) } @@ -154,7 +160,8 @@ impl Persister { refund_tx_id, created_at, state, - pair_fees_json + pair_fees_json, + version FROM send_swaps {where_clause_str} ORDER BY created_at @@ -197,6 +204,7 @@ impl Persister { created_at: row.get(14)?, state: row.get(15)?, pair_fees_json: row.get(16)?, + version: row.get(17)?, }) } @@ -385,9 +393,8 @@ impl InternalCreateSubmarineResponse { #[cfg(test)] mod tests { - use anyhow::{anyhow, Result}; - use crate::test_utils::persist::{create_persister, new_send_swap}; + use anyhow::{anyhow, Result}; use super::PaymentState; @@ -475,4 +482,30 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_writing_stale_swap() -> Result<()> { + create_persister!(storage); + + let send_swap = new_send_swap(None); + storage.insert_or_update_send_swap(&send_swap)?; + + // read - update - write works if there are no updates in between + let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap(); + send_swap.refund_tx_id = Some("tx_id".to_string()); + storage.insert_or_update_send_swap(&send_swap)?; + + // read - update - write works if there are no updates in between even if no field changes + let send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap(); + storage.insert_or_update_send_swap(&send_swap)?; + + // read - update - write fails if there are any updates in between + let mut send_swap = storage.fetch_send_swap_by_id(&send_swap.id)?.unwrap(); + send_swap.refund_tx_id = Some("tx_id_2".to_string()); + // Concurrent update + storage.set_send_swap_lockup_tx_id(&send_swap.id, "tx_id")?; + assert!(storage.insert_or_update_send_swap(&send_swap).is_err()); + + Ok(()) + } } diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 4a61b73..9e7ac89 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1385,6 +1385,7 @@ impl LiquidSdk { created_at: utils::now(), state: PaymentState::Created, refund_private_key: keypair.display_secret().to_string(), + version: 0, }; self.persister.insert_or_update_send_swap(&swap)?; swap @@ -1674,6 +1675,7 @@ impl LiquidSdk { refund_tx_id: None, created_at: utils::now(), state: PaymentState::Created, + version: 0, }; self.persister.insert_or_update_chain_swap(&swap)?; self.status_stream.track_swap_id(&swap_id)?; @@ -2023,6 +2025,7 @@ impl LiquidSdk { mrh_tx_id: None, created_at: utils::now(), state: PaymentState::Created, + version: 0, }) .map_err(|_| PaymentError::PersistError)?; self.status_stream.track_swap_id(&swap_id)?; @@ -2122,6 +2125,7 @@ impl LiquidSdk { refund_tx_id: None, created_at: utils::now(), state: PaymentState::Created, + version: 0, }; self.persister.insert_or_update_chain_swap(&swap)?; self.status_stream.track_swap_id(&swap.id)?; diff --git a/lib/core/src/sync/mod.rs b/lib/core/src/sync/mod.rs index c01889e..807f17e 100644 --- a/lib/core/src/sync/mod.rs +++ b/lib/core/src/sync/mod.rs @@ -4,12 +4,17 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use futures_util::TryFutureExt; -use model::data::PaymentDetailsSyncData; use tokio::sync::mpsc::Receiver; use tokio::sync::{watch, Mutex}; +use self::client::SyncerClient; +use self::model::{data::SyncData, sync::ListChangesRequest, RecordType, SyncState}; +use self::model::{DecryptionError, SyncOutgoingChanges}; use crate::prelude::Swap; use crate::recover::recoverer::Recoverer; +use crate::sync::model::data::{ + ChainSyncData, PaymentDetailsSyncData, ReceiveSyncData, SendSyncData, +}; use crate::sync::model::sync::{Record, SetRecordRequest, SetRecordStatus}; use crate::sync::model::DecryptionInfo; use crate::utils; @@ -18,14 +23,6 @@ use crate::{ prelude::Signer, }; -use self::client::SyncerClient; -use self::model::{ - data::{ChainSyncData, ReceiveSyncData, SendSyncData, SyncData}, - sync::ListChangesRequest, - RecordType, SyncState, -}; -use self::model::{DecryptionError, SyncOutgoingChanges}; - pub(crate) mod client; pub(crate) mod model; @@ -286,7 +283,17 @@ impl SyncService { for decryption_info in swap_decryption_info { let record = &decryption_info.record; match TryInto::::try_into(record.data.clone()) { - Ok(swap) => { + Ok(mut swap) => { + // If there is a local swap, take its version to prevent races between the + // recovery of step 2 and other potential changes occurring in parallel + // (e.g. a refund tx being broadcasted) + if let Ok(version) = self + .persister + .fetch_swap_by_id(&swap.id()) + .map(|s| s.version()) + { + swap.set_version(version); + } succeded.push(decryption_info); swaps.push(swap); } diff --git a/lib/core/src/sync/model/data.rs b/lib/core/src/sync/model/data.rs index b6a1b2c..6bfb593 100644 --- a/lib/core/src/sync/model/data.rs +++ b/lib/core/src/sync/model/data.rs @@ -110,6 +110,7 @@ impl From for ChainSwap { user_lockup_tx_id: None, claim_tx_id: None, refund_tx_id: None, + version: 0, } } } @@ -198,6 +199,7 @@ impl From for SendSwap { state: PaymentState::Created, lockup_tx_id: None, refund_tx_id: None, + version: 0, } } } @@ -281,6 +283,7 @@ impl From for ReceiveSwap { claim_tx_id: None, lockup_tx_id: None, mrh_tx_id: None, + version: 0, } } } diff --git a/lib/core/src/test_utils/chain_swap.rs b/lib/core/src/test_utils/chain_swap.rs index affdaa0..2d39d2e 100644 --- a/lib/core/src/test_utils/chain_swap.rs +++ b/lib/core/src/test_utils/chain_swap.rs @@ -137,6 +137,7 @@ pub(crate) fn new_chain_swap( } }"# .to_string(), + version: 0 }; } match direction { @@ -223,6 +224,7 @@ pub(crate) fn new_chain_swap( } } }"#.to_string(), + version: 0, }, Direction::Outgoing => ChainSwap { id: generate_random_string(4), @@ -306,6 +308,7 @@ pub(crate) fn new_chain_swap( } } }"#.to_string(), + version: 0 } } } diff --git a/lib/core/src/test_utils/persist.rs b/lib/core/src/test_utils/persist.rs index fdbffc8..88e040b 100644 --- a/lib/core/src/test_utils/persist.rs +++ b/lib/core/src/test_utils/persist.rs @@ -86,6 +86,7 @@ pub(crate) fn new_send_swap(payment_state: Option) -> SendSwap { created_at: utils::now(), state: payment_state.unwrap_or(PaymentState::Created), refund_private_key: "945affeef55f12227f1d4a3f80a17062a05b229ddc5a01591eb5ddf882df92e3".to_string(), + version: 0, } } @@ -140,6 +141,7 @@ pub(crate) fn new_receive_swap(payment_state: Option) -> ReceiveSw mrh_tx_id: None, created_at: utils::now(), state: payment_state.unwrap_or(PaymentState::Created), + version: 0, } }