Abstract swapper stream as part of swapper trait

This commit is contained in:
Roei Erez
2024-06-03 13:48:26 +03:00
parent ec51aa99b4
commit f7b7149187
7 changed files with 127 additions and 61 deletions

12
cli/Cargo.lock generated
View File

@@ -183,6 +183,17 @@ dependencies = [
"backtrace",
]
[[package]]
name = "async-trait"
version = "0.1.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atomic"
version = "0.5.3"
@@ -408,6 +419,7 @@ name = "breez-liquid-sdk"
version = "0.0.1"
dependencies = [
"anyhow",
"async-trait",
"bip39",
"boltz-client",
"chrono",

12
lib/Cargo.lock generated
View File

@@ -263,6 +263,17 @@ dependencies = [
"toml",
]
[[package]]
name = "async-trait"
version = "0.1.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.61",
]
[[package]]
name = "atomic"
version = "0.5.3"
@@ -512,6 +523,7 @@ name = "breez-liquid-sdk"
version = "0.0.1"
dependencies = [
"anyhow",
"async-trait",
"bip39",
"boltz-client",
"chrono",

View File

@@ -35,6 +35,7 @@ openssl = { version = "0.10", features = ["vendored"] }
tokio = { version = "1", features = ["rt", "macros"] }
url = "2.5.0"
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
async-trait = "0.1.80"
# Pin these versions to fix iOS build issues
security-framework = "=2.10.0"

View File

@@ -1,6 +1,5 @@
#[cfg(feature = "frb")]
pub mod bindings;
pub(crate) mod boltz_status_stream;
pub mod error;
pub(crate) mod event;
#[cfg(feature = "frb")]

View File

@@ -6,7 +6,7 @@ use std::{
sync::Arc,
time::{Duration, UNIX_EPOCH},
};
use async_trait::async_trait;
use anyhow::{anyhow, Result};
use boltz_client::lightning_invoice::Bolt11InvoiceDescription;
use boltz_client::swaps::boltzv2;
@@ -33,9 +33,8 @@ use tokio::time::MissedTickBehavior;
use crate::error::LiquidSdkError;
use crate::model::PaymentState::*;
use crate::swapper::{BoltzSwapper, Swapper};
use crate::{
boltz_status_stream::BoltzStatusStream,
use crate::swapper::{BoltzSwapper, ReconnectHandler, Swapper, SwapperStatusStream};
use crate::{
ensure_sdk,
error::{LiquidSdkResult, PaymentError},
event::EventManager,
@@ -59,7 +58,7 @@ pub struct LiquidSdk {
lwk_signer: SwSigner,
persister: Arc<Persister>,
event_manager: Arc<EventManager>,
status_stream: Arc<BoltzStatusStream>,
status_stream: Arc<dyn SwapperStatusStream>,
swapper: Arc<dyn Swapper>,
is_started: RwLock<bool>,
shutdown_sender: watch::Sender<()>,
@@ -99,19 +98,19 @@ impl LiquidSdk {
let persister = Arc::new(Persister::new(&config.working_dir, config.network)?);
persister.init()?;
let event_manager = Arc::new(EventManager::new());
let status_stream = Arc::new(BoltzStatusStream::new(&config.boltz_url, persister.clone()));
let event_manager = Arc::new(EventManager::new());
let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(());
let swapper = Arc::new(BoltzSwapper::new(config.clone()));
let status_stream = Arc::<dyn SwapperStatusStream>::from(swapper.create_status_stream());
let sdk = Arc::new(LiquidSdk {
config,
lwk_wollet: Arc::new(Mutex::new(lwk_wollet)),
lwk_signer: opts.signer,
persister,
persister: persister.clone(),
event_manager,
status_stream,
status_stream: status_stream.clone(),
swapper,
is_started: RwLock::new(false),
shutdown_sender,
@@ -159,12 +158,12 @@ impl LiquidSdk {
}
}
});
self.status_stream
.clone()
.track_pending_swaps(self.shutdown_receiver.clone())
.await;
let reconnect_handler = Box::new(SwapperReconnectHandler{
persister: self.persister.clone(),
status_stream: self.status_stream.clone(),
});
self.status_stream.clone().start(reconnect_handler, self.shutdown_receiver.clone()).await;
self.track_swap_updates().await;
self.track_refundable_swaps().await;
@@ -1446,7 +1445,30 @@ impl LiquidSdk {
/// An error is thrown if a global logger is already configured.
pub fn init_logging(log_dir: &str, app_logger: Option<Box<dyn log::Log>>) -> Result<()> {
crate::logger::init_logging(log_dir, app_logger)
}
}
}
struct SwapperReconnectHandler {
persister: Arc<Persister>,
status_stream: Arc<dyn SwapperStatusStream>,
}
#[async_trait]
impl ReconnectHandler for SwapperReconnectHandler {
async fn on_stream_reconnect(&self) {
match self.persister.list_ongoing_swaps() {
Ok(initial_ongoing_swaps) => {
info!("On stream reconnection, got {} initial ongoing swaps", initial_ongoing_swaps.len());
for ongoing_swap in initial_ongoing_swaps {
match self.status_stream.track_swap_id(&ongoing_swap.id()) {
Ok(_) => info!("Tracking ongoing swap: {}", ongoing_swap.id()),
Err(e) => error!("Failed to track ongoing swap: {e:?}"),
}
}
}
Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"),
}
}
}
#[cfg(test)]

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::swaps::boltzv2::{self, Subscription, SwapUpdate};
use futures_util::{SinkExt, StreamExt};
use log::{debug, error, info, warn};
@@ -12,39 +13,65 @@ use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use url::Url;
use crate::persist::Persister;
use super::{ReconnectHandler, SwapperStatusStream};
pub(crate) struct BoltzStatusStream {
url: String,
persister: Arc<Persister>,
subscription_notifier: broadcast::Sender<String>,
update_notifier: broadcast::Sender<boltzv2::Update>,
}
impl BoltzStatusStream {
pub(crate) fn new(url: &str, persister: Arc<Persister>) -> Self {
pub(crate) fn new(url: &str) -> Self {
let (subscription_notifier, _) = broadcast::channel::<String>(30);
let (update_notifier, _) = broadcast::channel::<boltzv2::Update>(30);
Self {
url: url.replace("http", "ws") + "/ws",
persister,
subscription_notifier,
update_notifier,
}
}
pub(crate) fn track_swap_id(&self, swap_id: &str) -> Result<()> {
async fn connect(&self) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let (socket, _) = connect_async(Url::parse(&self.url)?)
.await
.map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?;
Ok(socket)
}
async fn send_subscription(
&self,
swap_id: String,
ws_stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
) {
info!("Subscribing to status updates for swap ID {swap_id}");
let subscription = Subscription::new(&swap_id);
match serde_json::to_string(&subscription) {
Ok(subscribe_json) => match ws_stream.send(Message::Text(subscribe_json)).await {
Ok(_) => info!("Subscribed"),
Err(e) => error!("Failed to subscribe to {swap_id}: {e:?}"),
},
Err(e) => error!("Invalid subscription msg: {e:?}"),
}
}
}
#[async_trait]
impl SwapperStatusStream for BoltzStatusStream {
fn track_swap_id(&self, swap_id: &str) -> Result<()> {
let _ = self.subscription_notifier.send(swap_id.to_string());
Ok(())
}
pub(crate) fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltzv2::Update> {
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltzv2::Update> {
self.update_notifier.subscribe()
}
pub(crate) async fn track_pending_swaps(
self: Arc<BoltzStatusStream>,
async fn start(
self: Arc<Self>,
callback: Box<dyn ReconnectHandler>,
mut shutdown: watch::Receiver<()>,
) {
let keep_alive_ping_interval = Duration::from_secs(15);
@@ -55,17 +82,7 @@ impl BoltzStatusStream {
debug!("Start of ws stream loop");
match self.connect().await {
Ok(mut ws_stream) => {
// Initially subscribe to all ongoing swaps
match self.persister.list_ongoing_swaps() {
Ok(initial_ongoing_swaps) => {
info!("Got {} initial ongoing swaps", initial_ongoing_swaps.len());
for ongoing_swap in initial_ongoing_swaps {
self.send_subscription(ongoing_swap.id(), &mut ws_stream)
.await;
}
}
Err(e) => error!("Failed to list initial ongoing swaps: {e:?}"),
}
callback.on_stream_reconnect().await;
let mut interval = tokio::time::interval(keep_alive_ping_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
@@ -148,28 +165,4 @@ impl BoltzStatusStream {
}
});
}
async fn connect(&self) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let (socket, _) = connect_async(Url::parse(&self.url)?)
.await
.map_err(|e| anyhow!("Failed to connect to websocket: {e:?}"))?;
Ok(socket)
}
async fn send_subscription(
&self,
swap_id: String,
ws_stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
) {
info!("Subscribing to status updates for swap ID {swap_id}");
let subscription = Subscription::new(&swap_id);
match serde_json::to_string(&subscription) {
Ok(subscribe_json) => match ws_stream.send(Message::Text(subscribe_json)).await {
Ok(_) => info!("Subscribed"),
Err(e) => error!("Failed to subscribe to {swap_id}: {e:?}"),
},
Err(e) => error!("Invalid subscription msg: {e:?}"),
}
}
}

View File

@@ -1,9 +1,12 @@
mod boltz_status_stream;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use boltz_client::network::Chain;
use boltz_client::swaps::boltzv2::{
BoltzApiClientV2, ClaimTxResponse, CreateReverseRequest, CreateReverseResponse,
self, BoltzApiClientV2, ClaimTxResponse, CreateReverseRequest, CreateReverseResponse,
CreateSubmarineRequest, CreateSubmarineResponse, ReversePair, SubmarinePair,
};
@@ -11,14 +14,32 @@ use boltz_client::error::Error;
use boltz_client::util::secrets::Preimage;
use boltz_client::{Amount, Bolt11Invoice, LBtcSwapTxV2};
use boltz_status_stream::BoltzStatusStream;
use log::{debug, info};
use lwk_wollet::elements::LockTime;
use serde_json::Value;
use tokio::sync::{broadcast, watch};
use crate::error::PaymentError;
use crate::model::{Config, Network, ReceiveSwap, SendSwap};
use crate::utils;
#[async_trait]
pub trait ReconnectHandler: Send + Sync {
async fn on_stream_reconnect(&self);
}
#[async_trait]
pub trait SwapperStatusStream: Send + Sync {
async fn start(
self: Arc<Self>,
callback: Box<dyn ReconnectHandler>,
shutdown: watch::Receiver<()>,
);
fn track_swap_id(&self, swap_id: &str) -> Result<()>;
fn subscribe_swap_updates(&self) -> broadcast::Receiver<boltzv2::Update>;
}
pub trait Swapper: Send + Sync {
/// Create a new send swap
fn create_send_swap(
@@ -79,6 +100,8 @@ pub trait Swapper: Send + Sync {
/// Chain broadcast
#[allow(dead_code)]
fn broadcast_tx(&self, chain: Chain, tx_hex: &str) -> Result<Value, PaymentError>;
fn create_status_stream(&self) -> Box<dyn SwapperStatusStream>;
}
pub struct BoltzSwapper {
@@ -310,4 +333,8 @@ impl Swapper for BoltzSwapper {
fn broadcast_tx(&self, chain: Chain, tx_hex: &str) -> Result<Value, PaymentError> {
Ok(self.client.broadcast_tx(chain, &tx_hex.into())?)
}
fn create_status_stream(&self) -> Box<dyn SwapperStatusStream> {
Box::new(BoltzStatusStream::new(&self.config.boltz_url))
}
}