From 24861cb3728839ff6072848e5b4efb65ee644b71 Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Wed, 8 May 2024 14:35:50 +0200 Subject: [PATCH] Track swap status updates via websocket event stream --- lib/core/src/boltz_status_stream.rs | 169 ++++++++++++++++++++++++++++ lib/core/src/lib.rs | 1 + lib/core/src/sdk.rs | 16 ++- 3 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 lib/core/src/boltz_status_stream.rs diff --git a/lib/core/src/boltz_status_stream.rs b/lib/core/src/boltz_status_stream.rs new file mode 100644 index 0000000..89bfdc7 --- /dev/null +++ b/lib/core/src/boltz_status_stream.rs @@ -0,0 +1,169 @@ +use std::collections::HashMap; +use std::net::TcpStream; +use std::str::FromStr; +use std::sync::Arc; +use std::thread; + +use anyhow::{anyhow, ensure, Result}; +use boltz_client::swaps::{ + boltz::{RevSwapStates, SubSwapStates}, + boltzv2::{Subscription, SwapUpdate}, +}; +use log::{error, info, warn}; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::{Message, WebSocket}; + +use crate::model::*; +use crate::sdk::LiquidSdk; + +pub(super) struct BoltzStatusStream { + // socket: WebSocket>, +} +impl BoltzStatusStream { + pub(super) fn track_pending_swaps(sdk: Arc) -> Result<()> { + let mut socket = sdk + .boltz_client_v2() + .connect_ws() + .map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?; + + thread::spawn(move || loop { + // Map of (subscribed swap ID, is_swap_out) + let mut subscribed_ids: HashMap = HashMap::new(); + + // Initially subscribe to all ongoing swaps + match sdk.list_ongoing_swaps() { + Ok(initial_ongoing_swaps) => { + info!("Got {} initial ongoing swaps", initial_ongoing_swaps.len()); + + for ongoing_swap in &initial_ongoing_swaps { + let id = &ongoing_swap.id(); + info!("Subscribing to status for initial ongoing swap ID {id}"); + + let subscription = Subscription::new(id); + let subscribe_json = serde_json::to_string(&subscription) + .map_err(|e| anyhow!("Invalid subscription msg: {e:?}")) + .unwrap(); + socket + .send(tungstenite::Message::Text(subscribe_json)) + .map_err(|e| anyhow!("Failed to subscribe to {id}: {e:?}")) + .unwrap(); + + subscribed_ids + .insert(id.clone(), matches!(ongoing_swap, OngoingSwap::Receive(_))); + } + } + Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"), + } + + loop { + match &socket.read() { + Ok(Message::Close(_)) => { + warn!("Received close msg, exiting socket loop"); + break; + } + Ok(msg) => { + info!("Received msg : {msg:?}"); + + // Each time socket.read() returns, we have the opportunity to socket.send(). + // We use this window to subscribe to any new ongoing swaps. + // This happens on any non-close socket messages, in particular: + // Ping (periodic keep-alive), Text (status update) + match sdk.list_ongoing_swaps() { + Ok(ongoing_swaps) => { + let new_ongoing_swaps: Vec = ongoing_swaps + .into_iter() + .filter(|os| !subscribed_ids.contains_key(&os.id())) + .collect(); + for ongoing_swap in &new_ongoing_swaps { + let id = ongoing_swap.id(); + info!("Subscribing to statuses for ongoing swap ID: {id}"); + + let subscription = Subscription::new(&id); + let subscribe_json = serde_json::to_string(&subscription) + .map_err(|e| anyhow!("Invalid subscription msg: {e:?}")) + .unwrap(); + socket + .send(tungstenite::Message::Text(subscribe_json)) + .map_err(|e| anyhow!("Failed to subscribe to {id}: {e:?}")) + .unwrap(); + + subscribed_ids.insert( + id.clone(), + matches!(ongoing_swap, OngoingSwap::Receive(_)), + ); + } + } + Err(e) => error!("Failed to list new ongoing swaps: {e:?}"), + } + + // We parse and handle any Text websocket messages, which are likely status updates + if msg.is_text() { + let response: SwapUpdate = serde_json::from_str(&msg.to_string()) + .map_err(|e| anyhow!("WS response is invalid SwapUpdate: {e:?}")) + .unwrap(); + info!("Received update : {response:?}"); + + match response { + // Subscription confirmation + boltz_client::swaps::boltzv2::SwapUpdate::Subscription { + .. + } => {} + + // Status update + boltz_client::swaps::boltzv2::SwapUpdate::Update { + event, + channel, + args, + } => { + let update = args.first().unwrap().clone(); // TODO + let update_swap_id = update.id.clone(); + let update_state_str = update.status.clone(); + + match subscribed_ids.get(&update_swap_id) { + Some(true) => { + // Known OngoingSwapOut / receive swap + + let new_state = RevSwapStates::from_str(&update_state_str).map_err(|_| { + anyhow!("Invalid state for reverse swap {update_swap_id}: {update_state_str}") + }).unwrap(); + let res = sdk.try_handle_reverse_swap_status( + new_state, + &update_swap_id, + ); + info!("OngoingSwapOut / receive try_handle_reverse_swap_status res: {res:?}"); + } + Some(false) => { + // Known OngoingSwapIn / Send swap + + let new_state = SubSwapStates::from_str(&update_state_str).map_err(|_| { + anyhow!("Invalid state for submarine swap {update_swap_id}: {update_state_str}") + }).unwrap(); + let res = sdk.try_handle_submarine_swap_status( + new_state, + &update_swap_id, + ); + info!("OngoingSwapIn / Send try_handle_submarine_swap_status res: {res:?}"); + } + None => { + // We got an update for a swap we did not track as ongoing + todo!() + } + } + } + + // Error related to subscription, like "Unknown swap ID" + boltz_client::swaps::boltzv2::SwapUpdate::Error { .. } => todo!(), + } + } + } + Err(e) => { + error!("Received stream error : {e:?}"); + break; + } + } + } + }); + + Ok(()) + } +} diff --git a/lib/core/src/lib.rs b/lib/core/src/lib.rs index cd70d01..5e202cb 100644 --- a/lib/core/src/lib.rs +++ b/lib/core/src/lib.rs @@ -1,5 +1,6 @@ #[cfg(feature = "frb")] pub mod bindings; +pub(crate) mod boltz_status_stream; pub mod error; #[cfg(feature = "frb")] pub mod frb; diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index b57be01..12428d4 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -33,7 +33,8 @@ use lwk_wollet::{ }; use crate::{ - ensure_sdk, error::PaymentError, get_invoice_amount, model::*, persist::Persister, utils, + boltz_status_stream::BoltzStatusStream, ensure_sdk, error::PaymentError, get_invoice_amount, + model::*, persist::Persister, utils, }; /// Claim tx feerate, in sats per vbyte. @@ -97,7 +98,8 @@ impl LiquidSdk { data_dir_path, }); - LiquidSdk::track_pending_swaps(&sdk)?; + // LiquidSdk::track_pending_swaps(&sdk)?; + BoltzStatusStream::track_pending_swaps(sdk.clone())?; Ok(sdk) } @@ -194,6 +196,7 @@ impl LiquidSdk { } } + // TODO Not needed anymore with the event stream fn try_resolve_pending_swap(&self, swap: &OngoingSwap) -> Result<()> { let client = self.boltz_client(); let client_v2 = self.boltz_client_v2(); @@ -221,6 +224,7 @@ impl LiquidSdk { Ok(()) } + // TODO Not needed anymore with the event stream fn track_pending_swaps(self: &Arc) -> Result<()> { let cloned = self.clone(); thread::spawn(move || loop { @@ -247,6 +251,10 @@ impl LiquidSdk { Ok(()) } + pub(crate) fn list_ongoing_swaps(&self) -> Result> { + self.persister.list_ongoing_swaps() + } + fn scan(&self) -> Result<(), lwk_wollet::Error> { let mut electrum_client = ElectrumClient::new(&self.electrum_url)?; let mut lwk_wollet = self.lwk_wollet.lock().unwrap(); @@ -288,7 +296,7 @@ impl LiquidSdk { BoltzApiClient::new(base_url) } - fn boltz_client_v2(&self) -> BoltzApiClientV2 { + pub(crate) fn boltz_client_v2(&self) -> BoltzApiClientV2 { let base_url = match self.network { Network::LiquidTestnet => BOLTZ_TESTNET_URL_V2, Network::Liquid => BOLTZ_MAINNET_URL_V2, @@ -576,7 +584,9 @@ impl LiquidSdk { &our_keys, &Preimage::from_str(&ongoing_swap_out.preimage)?, Amount::from_sat(ongoing_swap_out.claim_fees_sat), + // Enable cooperative claim (Some) or not (None) Some((&self.boltz_client_v2(), ongoing_swap_out.id.clone())), + // None )?; claim_tx.broadcast(