Reserved address pool (#531)

This commit is contained in:
Ross Savage
2024-10-30 11:06:10 +01:00
committed by GitHub
parent b3f1eed429
commit 4d036f2529
22 changed files with 671 additions and 103 deletions

2
cli/Cargo.lock generated
View File

@@ -1918,7 +1918,7 @@ dependencies = [
[[package]]
name = "lwk_wollet"
version = "0.7.0"
source = "git+https://github.com/dangeross/lwk?branch=savage-try-headers-subscribe#452bc5e07ab6b9a5fee8c2dae5c9e24d5e3cc9c5"
source = "git+https://github.com/dangeross/lwk?branch=savage-full-scan-to-index#ccc2c70404e07e7c79cb28db460b005f13be931c"
dependencies = [
"aes-gcm-siv",
"base64 0.21.7",

2
lib/Cargo.lock generated
View File

@@ -2121,7 +2121,7 @@ dependencies = [
[[package]]
name = "lwk_wollet"
version = "0.7.0"
source = "git+https://github.com/dangeross/lwk?branch=savage-try-headers-subscribe#452bc5e07ab6b9a5fee8c2dae5c9e24d5e3cc9c5"
source = "git+https://github.com/dangeross/lwk?branch=savage-full-scan-to-index#ccc2c70404e07e7c79cb28db460b005f13be931c"
dependencies = [
"aes-gcm-siv",
"base64 0.21.7",

View File

@@ -30,7 +30,7 @@ impl UniffiBindingLogger {
impl log::Log for UniffiBindingLogger {
fn enabled(&self, m: &Metadata) -> bool {
// ignore the internal uniffi log to prevent infinite loop.
return m.level() <= Level::Trace && *m.target() != *"breez_sdk_liquid_bindings";
m.level() <= Level::Trace && *m.target() != *"breez_sdk_liquid_bindings"
}
fn log(&self, record: &Record) {

View File

@@ -23,7 +23,7 @@ flutter_rust_bridge = { version = "=2.4.0", features = [
log = { workspace = true }
lwk_common = "0.7.0"
lwk_signer = "0.7.0"
lwk_wollet = { git = "https://github.com/dangeross/lwk", branch = "savage-try-headers-subscribe" }
lwk_wollet = { git = "https://github.com/dangeross/lwk", branch = "savage-full-scan-to-index" }
#lwk_wollet = "0.7.0"
rusqlite = { version = "0.31", features = ["backup", "bundled"] }
rusqlite_migration = "1.0"

View File

@@ -225,24 +225,26 @@ impl ChainSwapHandler {
}
async fn rescan_outgoing_chain_swap(&self, swap: &ChainSwap) -> Result<()> {
let address = Address::from_str(&swap.claim_address)?;
let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?;
let script_pubkey = address.assume_checked().script_pubkey();
let script_history = self
.bitcoin_chain_service
.lock()
.await
.get_script_history(script_pubkey.as_script())?;
let claim_tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&claim_tx_id) && h.height > 0);
if claim_tx_history.is_some() {
info!(
"Outgoing Chain Swap {} claim tx is confirmed. Setting the swap to Complete",
swap.id
);
self.update_swap_info(&swap.id, Complete, None, None, None, None)
.await?;
if let Some(claim_address) = &swap.claim_address {
let address = Address::from_str(claim_address)?;
let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?;
let script_pubkey = address.assume_checked().script_pubkey();
let script_history = self
.bitcoin_chain_service
.lock()
.await
.get_script_history(script_pubkey.as_script())?;
let claim_tx_history = script_history
.iter()
.find(|h| h.txid.to_hex().eq(&claim_tx_id) && h.height > 0);
if claim_tx_history.is_some() {
info!(
"Outgoing Chain Swap {} claim tx is confirmed. Setting the swap to Complete",
swap.id
);
self.update_swap_info(&swap.id, Complete, None, None, None, None)
.await?;
}
}
Ok(())
}
@@ -698,9 +700,16 @@ impl ChainSwapHandler {
ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed);
debug!("Initiating claim for Chain Swap {swap_id}");
// Derive a new Liquid address for an incoming swap, or use the set Bitcoin address for an outgoing swap
let claim_address = match swap.direction {
Direction::Incoming => {
Some(self.onchain_wallet.next_unused_address().await?.to_string())
}
Direction::Outgoing => swap.claim_address.clone(),
};
let claim_tx = self
.swapper
.create_claim_tx(Swap::Chain(swap.clone()), None)?;
.create_claim_tx(Swap::Chain(swap.clone()), claim_address)?;
// Set the swap claim_tx_id before broadcasting.
// If another claim_tx_id has been set in the meantime, don't broadcast the claim tx
@@ -910,7 +919,6 @@ impl ChainSwapHandler {
swap.id
);
let refund_address = self.onchain_wallet.next_unused_address().await?.to_string();
let SwapScriptV2::Liquid(swap_script) = swap.get_lockup_swap_script()? else {
return Err(PaymentError::Generic {
err: "Unexpected swap script type found".to_string(),
@@ -925,6 +933,7 @@ impl ChainSwapHandler {
.script_pubkey();
let utxos = liquid_chain_service.get_script_utxos(&script_pk).await?;
let refund_address = self.onchain_wallet.next_unused_address().await?.to_string();
let SdkTransaction::Liquid(refund_tx) = self.swapper.create_refund_tx(
Swap::Chain(swap.clone()),
&refund_address,

View File

@@ -186,6 +186,12 @@ impl From<anyhow::Error> for PaymentError {
}
}
impl From<rusqlite::Error> for PaymentError {
fn from(_: rusqlite::Error) -> Self {
Self::PersistError
}
}
impl From<SdkError> for PaymentError {
fn from(err: SdkError) -> Self {
Self::Generic {

View File

@@ -261,6 +261,16 @@ pub struct ConnectWithSignerRequest {
pub config: Config,
}
/// A reserved address. Once an address is reserved, it can only be
/// reallocated to another payment after the block height expiration.
#[derive(Clone, Debug)]
pub(crate) struct ReservedAddress {
/// The address that is reserved
pub(crate) address: String,
/// The block height that the address is reserved until
pub(crate) expiry_block_height: u32,
}
/// The send/receive methods supported by the SDK
#[derive(Clone, Debug, EnumString, Serialize, Eq, PartialEq)]
pub enum PaymentMethod {
@@ -600,7 +610,8 @@ impl FromSql for Direction {
pub(crate) struct ChainSwap {
pub(crate) id: String,
pub(crate) direction: Direction,
pub(crate) claim_address: String,
/// The Bitcoin claim address is only set for Outgoing Chain Swaps
pub(crate) claim_address: Option<String>,
pub(crate) lockup_address: String,
pub(crate) timeout_block_height: u32,
pub(crate) preimage: String,
@@ -823,8 +834,16 @@ pub(crate) struct ReceiveSwap {
pub(crate) claim_fees_sat: u64,
/// Persisted as soon as a claim tx is broadcast
pub(crate) claim_tx_id: Option<String>,
/// Persisted only when the lockup tx is broadcast
pub(crate) lockup_tx_id: Option<String>,
/// The address reserved for a magic routing hint payment
pub(crate) mrh_address: String,
/// The script pubkey for a magic routing hint payment
pub(crate) mrh_script_pubkey: String,
/// Persisted only if a transaction is sent to the `mrh_address`
pub(crate) mrh_tx_id: Option<String>,
/// Until the lockup tx is seen in the mempool, it contains the swap creation time.
/// Afterwards, it shows the lockup tx creation time.
/// Afterwards, it shows the lockup tx creation time.
pub(crate) created_at: u32,
pub(crate) state: PaymentState,
}

View File

@@ -0,0 +1,151 @@
use anyhow::Result;
use log::debug;
use rusqlite::{Row, Transaction, TransactionBehavior};
use crate::error::PaymentError;
use super::{Persister, ReservedAddress};
impl Persister {
pub(crate) fn next_expired_reserved_address(
&self,
tip: u32,
) -> Result<Option<ReservedAddress>> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
// Get the next expired reserved address
let query = Self::get_reserved_address_query(vec!["expiry_block_height < ?1".to_string()]);
let res = match tx.query_row(&query, [tip], Self::sql_row_to_reserved_address) {
Ok(reserved_address) => {
// Delete the reserved address
Self::delete_reserved_address_inner(&tx, &reserved_address.address)?;
Some(reserved_address)
}
Err(_) => None,
};
tx.commit()?;
Ok(res)
}
fn get_reserved_address_query(where_clauses: Vec<String>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
where_clause_str.push_str(where_clauses.join(" AND ").as_str());
}
format!(
"
SELECT
address,
expiry_block_height
FROM reserved_addresses
{where_clause_str}
ORDER BY expiry_block_height ASC
LIMIT 1
"
)
}
pub(crate) fn insert_or_update_reserved_address(
&self,
address: &str,
expiry_block_height: u32,
) -> Result<(), PaymentError> {
let con = self.get_connection()?;
con.execute(
"INSERT OR REPLACE INTO reserved_addresses (
address,
expiry_block_height
)
VALUES (?, ?)
",
(&address, expiry_block_height),
)?;
debug!(
"Reserved address {} until block height {}",
address, expiry_block_height
);
Ok(())
}
pub(crate) fn delete_reserved_address(&self, address: &str) -> Result<(), PaymentError> {
let mut con = self.get_connection()?;
let tx = con.transaction()?;
Self::delete_reserved_address_inner(&tx, address)?;
tx.commit()?;
Ok(())
}
fn delete_reserved_address_inner(tx: &Transaction, address: &str) -> Result<(), PaymentError> {
tx.execute(
"DELETE FROM reserved_addresses WHERE address = ?",
[address],
)?;
Ok(())
}
fn sql_row_to_reserved_address(row: &Row) -> rusqlite::Result<ReservedAddress> {
Ok(ReservedAddress {
address: row.get(0)?,
expiry_block_height: row.get(1)?,
})
}
}
#[cfg(test)]
mod tests {
use anyhow::Result;
use crate::test_utils::persist::new_persister;
#[test]
fn test_next_expired_reserved_address() -> Result<()> {
let (_temp_dir, storage) = new_persister()?;
let address = "tlq1pq2amlulhea6ltq7x3eu9atsc2nnrer7yt7xve363zxedqwu2mk6ctcyv9awl8xf28cythreqklt5q0qqwsxzlm6wu4z6d574adl9zh2zmr0h85gt534n";
storage.insert_or_update_reserved_address(&address, 100)?;
let maybe_reserved_address = storage.next_expired_reserved_address(99)?;
// Under the expiry, not popped
assert!(maybe_reserved_address.is_none());
let maybe_reserved_address = storage.next_expired_reserved_address(100)?;
// Equal to expiry, not popped
assert!(maybe_reserved_address.is_none());
let maybe_reserved_address = storage.next_expired_reserved_address(101)?;
// Address expired, popped
assert!(maybe_reserved_address.is_some());
let maybe_reserved_address = storage.next_expired_reserved_address(102)?;
// Address already popped
assert!(maybe_reserved_address.is_none());
Ok(())
}
#[test]
fn test_delete_reserved_address() -> Result<()> {
let (_temp_dir, storage) = new_persister()?;
let address = "tlq1pq2amlulhea6ltq7x3eu9atsc2nnrer7yt7xve363zxedqwu2mk6ctcyv9awl8xf28cythreqklt5q0qqwsxzlm6wu4z6d574adl9zh2zmr0h85gt534n";
storage.insert_or_update_reserved_address(&address, 100)?;
let maybe_reserved_address = storage.next_expired_reserved_address(99)?;
// Under the expiry, not popped
assert!(maybe_reserved_address.is_none());
storage.delete_reserved_address(&address)?;
let maybe_reserved_address = storage.next_expired_reserved_address(101)?;
// Over the expired, but already deleted
assert!(maybe_reserved_address.is_none());
Ok(())
}
}

View File

@@ -1,4 +1,5 @@
use anyhow::Result;
use rusqlite::{Transaction, TransactionBehavior};
use std::str::FromStr;
use super::Persister;
@@ -6,10 +7,12 @@ use super::Persister;
const KEY_SWAPPER_PROXY_URL: &str = "swapper_proxy_url";
const KEY_IS_FIRST_SYNC_COMPLETE: &str = "is_first_sync_complete";
const KEY_WEBHOOK_URL: &str = "webhook_url";
// TODO: The `last_derivation_index` needs to be synced
const KEY_LAST_DERIVATION_INDEX: &str = "last_derivation_index";
impl Persister {
pub fn get_cached_item(&self, key: &str) -> Result<Option<String>> {
let res = self.get_connection()?.query_row(
fn get_cached_item_inner(tx: &Transaction, key: &str) -> Result<Option<String>> {
let res = tx.query_row(
"SELECT value FROM cached_items WHERE key = ?1",
[key],
|row| row.get(0),
@@ -17,21 +20,43 @@ impl Persister {
Ok(res.ok())
}
pub fn update_cached_item(&self, key: &str, value: String) -> Result<()> {
self.get_connection()?.execute(
fn update_cached_item_inner(tx: &Transaction, key: &str, value: String) -> Result<()> {
tx.execute(
"INSERT OR REPLACE INTO cached_items (key, value) VALUES (?1,?2)",
(key, value),
)?;
Ok(())
}
#[allow(dead_code)]
pub fn delete_cached_item(&self, key: &str) -> Result<()> {
self.get_connection()?
.execute("DELETE FROM cached_items WHERE key = ?1", [key])?;
pub fn delete_cached_item_inner(tx: &Transaction, key: &str) -> Result<()> {
tx.execute("DELETE FROM cached_items WHERE key = ?1", [key])?;
Ok(())
}
pub fn get_cached_item(&self, key: &str) -> Result<Option<String>> {
let mut con = self.get_connection()?;
let tx = con.transaction()?;
let res = Self::get_cached_item_inner(&tx, key);
tx.commit()?;
res
}
pub fn update_cached_item(&self, key: &str, value: String) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction()?;
let res = Self::update_cached_item_inner(&tx, key, value);
tx.commit()?;
res
}
pub fn delete_cached_item(&self, key: &str) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction()?;
let res = Self::delete_cached_item_inner(&tx, key);
tx.commit()?;
res
}
pub fn set_swapper_proxy_url(&self, swapper_proxy_url: String) -> Result<()> {
self.update_cached_item(KEY_SWAPPER_PROXY_URL, swapper_proxy_url)
}
@@ -65,6 +90,38 @@ impl Persister {
pub fn get_webhook_url(&self) -> Result<Option<String>> {
self.get_cached_item(KEY_WEBHOOK_URL)
}
pub fn set_last_derivation_index(&self, index: u32) -> Result<()> {
self.update_cached_item(KEY_LAST_DERIVATION_INDEX, index.to_string())
}
pub fn get_last_derivation_index(&self) -> Result<Option<u32>> {
self.get_cached_item(KEY_LAST_DERIVATION_INDEX)
.map(|maybe_str| maybe_str.and_then(|str| str.as_str().parse::<u32>().ok()))
}
pub fn next_derivation_index(&self) -> Result<Option<u32>> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;
let res = match Self::get_cached_item_inner(&tx, KEY_LAST_DERIVATION_INDEX)? {
Some(last_index_str) => {
let next_index = last_index_str
.as_str()
.parse::<u32>()
.map(|index| index + 1)?;
Self::update_cached_item_inner(
&tx,
KEY_LAST_DERIVATION_INDEX,
next_index.to_string(),
)?;
Some(next_index)
}
None => None,
};
tx.commit()?;
Ok(res)
}
}
#[cfg(test)]
@@ -87,4 +144,60 @@ mod tests {
Ok(())
}
#[test]
fn test_get_last_derivation_index() -> Result<()> {
let (_temp_dir, persister) = new_persister()?;
let maybe_last_index = persister.get_last_derivation_index()?;
assert!(maybe_last_index.is_none());
persister.set_last_derivation_index(50)?;
let maybe_last_index = persister.get_last_derivation_index()?;
assert!(maybe_last_index.is_some());
assert_eq!(maybe_last_index, Some(50));
persister.set_last_derivation_index(51)?;
let maybe_last_index = persister.get_last_derivation_index()?;
assert!(maybe_last_index.is_some());
assert_eq!(maybe_last_index, Some(51));
Ok(())
}
#[test]
fn test_next_derivation_index() -> Result<()> {
let (_temp_dir, persister) = new_persister()?;
let maybe_next_index = persister.next_derivation_index()?;
assert!(maybe_next_index.is_none());
persister.set_last_derivation_index(50)?;
let maybe_next_index = persister.next_derivation_index()?;
assert!(maybe_next_index.is_some());
assert_eq!(maybe_next_index, Some(51));
let maybe_last_index = persister.get_last_derivation_index()?;
assert!(maybe_last_index.is_some());
assert_eq!(maybe_last_index, Some(51));
persister.set_last_derivation_index(52)?;
let maybe_next_index = persister.next_derivation_index()?;
assert!(maybe_next_index.is_some());
assert_eq!(maybe_next_index, Some(53));
let maybe_next_index = persister.next_derivation_index()?;
assert!(maybe_next_index.is_some());
assert_eq!(maybe_next_index, Some(54));
let maybe_last_index = persister.get_last_derivation_index()?;
assert!(maybe_last_index.is_some());
assert_eq!(maybe_last_index, Some(54));
Ok(())
}
}

View File

@@ -259,8 +259,7 @@ impl Persister {
":id": swap_id,
":accept_zero_conf": accept_zero_conf,
},
)
.map_err(|_| PaymentError::PersistError)?;
)?;
Ok(())
}
@@ -357,8 +356,7 @@ impl Persister {
":refund_tx_id": refund_tx_id,
":state": to_state,
},
)
.map_err(|_| PaymentError::PersistError)?;
)?;
Ok(())
}

View File

@@ -98,5 +98,90 @@ pub(crate) fn current_migrations() -> Vec<&'static str> {
ALTER TABLE receive_swaps ADD COLUMN payment_hash TEXT;
ALTER TABLE send_swaps ADD COLUMN payment_hash TEXT;
",
"
CREATE TABLE IF NOT EXISTS reserved_addresses (
address TEXT NOT NULL PRIMARY KEY,
expiry_block_height INTEGER NOT NULL
) STRICT;
ALTER TABLE receive_swaps ADD COLUMN mrh_address TEXT NOT NULL DEFAULT '';
ALTER TABLE receive_swaps ADD COLUMN mrh_script_pubkey TEXT NOT NULL DEFAULT '';
ALTER TABLE receive_swaps ADD COLUMN mrh_tx_id TEXT;
",
"
ALTER TABLE chain_swaps RENAME TO old_chain_swaps;
CREATE TABLE IF NOT EXISTS chain_swaps (
id TEXT NOT NULL PRIMARY KEY,
direction INTEGER NOT NULL,
claim_address TEXT,
lockup_address TEXT NOT NULL,
timeout_block_height INTEGER NOT NULL,
preimage TEXT NOT NULL,
payer_amount_sat INTEGER NOT NULL,
receiver_amount_sat INTEGER NOT NULL,
accept_zero_conf INTEGER NOT NULL,
create_response_json TEXT NOT NULL,
claim_private_key TEXT NOT NULL,
refund_private_key TEXT NOT NULL,
server_lockup_tx_id TEXT,
user_lockup_tx_id TEXT,
claim_fees_sat INTEGER NOT NULL,
claim_tx_id TEXT,
refund_tx_id TEXT,
created_at INTEGER NOT NULL,
state INTEGER NOT NULL,
description TEXT,
id_hash TEXT
) STRICT;
INSERT INTO chain_swaps (
id,
direction,
claim_address,
lockup_address,
timeout_block_height,
preimage,
payer_amount_sat,
receiver_amount_sat,
accept_zero_conf,
create_response_json,
claim_private_key,
refund_private_key,
server_lockup_tx_id,
user_lockup_tx_id,
claim_fees_sat,
claim_tx_id,
refund_tx_id,
created_at,
state,
description,
id_hash
) SELECT
id,
direction,
claim_address,
lockup_address,
timeout_block_height,
preimage,
payer_amount_sat,
receiver_amount_sat,
accept_zero_conf,
create_response_json,
claim_private_key,
refund_private_key,
server_lockup_tx_id,
user_lockup_tx_id,
claim_fees_sat,
claim_tx_id,
refund_tx_id,
created_at,
state,
description,
id_hash
FROM old_chain_swaps;
DROP TABLE old_chain_swaps;
",
]
}

View File

@@ -1,3 +1,4 @@
mod address;
mod backup;
mod cache;
pub(crate) mod chain;
@@ -110,8 +111,7 @@ impl Persister {
ptx.payment_type,
ptx.is_confirmed,
),
)
.map_err(|_| PaymentError::PersistError)?;
)?;
if let Some(destination) = destination {
con.execute(
@@ -123,8 +123,7 @@ impl Persister {
VALUES (?, ?, ?)
",
(ptx.tx_id, destination, description),
)
.map_err(|_| PaymentError::PersistError)?;
)?;
}
Ok(())
@@ -204,9 +203,9 @@ impl Persister {
FROM payment_tx_data AS ptx -- Payment tx (each tx results in a Payment)
FULL JOIN (
SELECT * FROM receive_swaps
WHERE claim_tx_id IS NOT NULL OR lockup_tx_id IS NOT NULL
WHERE COALESCE(claim_tx_id, lockup_tx_id, mrh_tx_id) IS NOT NULL
) rs -- Receive Swap data (by claim)
ON ptx.tx_id = rs.claim_tx_id
ON ptx.tx_id in (rs.claim_tx_id, rs.mrh_tx_id)
LEFT JOIN send_swaps AS ss -- Send Swap data
ON ptx.tx_id = ss.lockup_tx_id
LEFT JOIN chain_swaps AS cs -- Chain Swap data

View File

@@ -25,12 +25,12 @@ impl Persister {
claim_private_key,
invoice,
payment_hash,
description,
payer_amount_sat,
receiver_amount_sat,
created_at,
claim_fees_sat,
claim_tx_id,
mrh_address,
mrh_script_pubkey,
state
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
@@ -44,15 +44,31 @@ impl Persister {
&receive_swap.claim_private_key,
&receive_swap.invoice,
&receive_swap.payment_hash,
&receive_swap.description,
&receive_swap.payer_amount_sat,
&receive_swap.receiver_amount_sat,
&receive_swap.created_at,
&receive_swap.claim_fees_sat,
&receive_swap.claim_tx_id,
&receive_swap.mrh_address,
&receive_swap.mrh_script_pubkey,
&receive_swap.state,
))?;
con.execute(
"UPDATE receive_swaps
SET
description = :description,
claim_tx_id = :claim_tx_id,
mrh_tx_id = :mrh_tx_id
WHERE
id = :id",
named_params! {
":id": &receive_swap.id,
":description": &receive_swap.description,
":claim_tx_id": &receive_swap.claim_tx_id,
":mrh_tx_id": &receive_swap.mrh_tx_id,
},
)?;
Ok(())
}
@@ -77,6 +93,10 @@ impl Persister {
rs.receiver_amount_sat,
rs.claim_fees_sat,
rs.claim_tx_id,
rs.lockup_tx_id,
rs.mrh_address,
rs.mrh_script_pubkey,
rs.mrh_tx_id,
rs.created_at,
rs.state
FROM receive_swaps AS rs
@@ -118,8 +138,12 @@ impl Persister {
receiver_amount_sat: row.get(8)?,
claim_fees_sat: row.get(9)?,
claim_tx_id: row.get(10)?,
created_at: row.get(11)?,
state: row.get(12)?,
lockup_tx_id: row.get(11)?,
mrh_address: row.get(12)?,
mrh_script_pubkey: row.get(13)?,
mrh_tx_id: row.get(14)?,
created_at: row.get(15)?,
state: row.get(16)?,
})
}
@@ -165,6 +189,28 @@ impl Persister {
Ok(res)
}
/// Ongoing Receive Swaps with no claim or lockup transactions, indexed by mrh_script_pubkey
pub(crate) fn list_ongoing_receive_swaps_by_mrh_script_pubkey(
&self,
) -> Result<HashMap<String, ReceiveSwap>> {
let con: Connection = self.get_connection()?;
let res = self
.list_ongoing_receive_swaps(&con)?
.iter()
.filter_map(|swap| {
match (
swap.lockup_tx_id.clone(),
swap.claim_tx_id.clone(),
swap.mrh_script_pubkey.is_empty(),
) {
(None, None, false) => Some((swap.mrh_script_pubkey.clone(), swap.clone())),
_ => None,
}
})
.collect();
Ok(res)
}
/// Pending Receive Swaps, indexed by claim_tx_id
pub(crate) fn list_pending_receive_swaps_by_claim_tx_id(
&self,
@@ -232,6 +278,8 @@ impl Persister {
to_state: PaymentState,
claim_tx_id: Option<&str>,
lockup_tx_id: Option<&str>,
mrh_tx_id: Option<&str>,
mrh_amount_sat: Option<u64>,
) -> Result<(), PaymentError> {
// Do not overwrite claim_tx_id or lockup_tx_id
let con: Connection = self.get_connection()?;
@@ -248,6 +296,13 @@ impl Persister {
WHEN lockup_tx_id IS NULL THEN :lockup_tx_id
ELSE lockup_tx_id
END,
mrh_tx_id =
CASE
WHEN mrh_tx_id IS NULL THEN :mrh_tx_id
ELSE mrh_tx_id
END,
payer_amount_sat = COALESCE(:mrh_amount_sat, payer_amount_sat),
receiver_amount_sat = COALESCE(:mrh_amount_sat, receiver_amount_sat),
state = :state
WHERE
id = :id",
@@ -255,10 +310,11 @@ impl Persister {
":id": swap_id,
":lockup_tx_id": lockup_tx_id,
":claim_tx_id": claim_tx_id,
":mrh_tx_id": mrh_tx_id,
":mrh_amount_sat": mrh_amount_sat,
":state": to_state,
},
)
.map_err(|_| PaymentError::PersistError)?;
)?;
Ok(())
}
@@ -364,7 +420,14 @@ mod tests {
let new_state = PaymentState::Pending;
let claim_tx_id = Some("claim_tx_id");
storage.try_handle_receive_swap_update(&receive_swap.id, new_state, claim_tx_id, None)?;
storage.try_handle_receive_swap_update(
&receive_swap.id,
new_state,
claim_tx_id,
None,
None,
None,
)?;
let updated_receive_swap = storage
.fetch_receive_swap_by_id(&receive_swap.id)?

View File

@@ -71,8 +71,7 @@ impl Persister {
":from_state": from_state,
":to_state": to_state,
},
)
.map_err(|_| PaymentError::PersistError)?;
)?;
Ok(())
}
@@ -236,8 +235,7 @@ impl Persister {
":refund_tx_id": refund_tx_id,
":state": to_state,
},
)
.map_err(|_| PaymentError::PersistError)?;
)?;
Ok(())
}

View File

@@ -76,8 +76,16 @@ impl ReceiveSwapHandler {
| RevSwapStates::TransactionFailed
| RevSwapStates::TransactionRefunded,
) => {
error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
self.update_swap_info(id, Failed, None, None).await?;
match receive_swap.mrh_tx_id {
Some(mrh_tx_id) => {
warn!("Swap {id} is expired but MRH payment was received: txid {mrh_tx_id}")
}
None => {
error!("Swap {id} entered into an unrecoverable state: {swap_state:?}");
self.update_swap_info(id, Failed, None, None, None, None)
.await?;
}
}
Ok(())
}
// The lockup tx is in the mempool and we accept 0-conf => try to claim
@@ -93,6 +101,13 @@ impl ReceiveSwapHandler {
));
}
// Do not continue or claim the swap if it was already paid via MRH
if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
return Err(anyhow!(
"MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
));
}
// looking for lockup script history to verify lockup was broadcasted
if let Err(e) = self
.verify_lockup_tx(&receive_swap, &transaction, false)
@@ -107,7 +122,7 @@ impl ReceiveSwapHandler {
info!("swapper lockup was verified");
let lockup_tx_id = &transaction.id;
self.update_swap_info(id, Pending, None, Some(lockup_tx_id))
self.update_swap_info(id, Pending, None, Some(lockup_tx_id), None, None)
.await?;
let lockup_tx = utils::deserialize_tx_hex(&transaction.hex)?;
@@ -163,6 +178,13 @@ impl ReceiveSwapHandler {
return Err(anyhow!("Unexpected payload from Boltz status stream"));
};
// Do not continue or claim the swap if it was already paid via MRH
if let Some(mrh_tx_id) = receive_swap.mrh_tx_id {
return Err(anyhow!(
"MRH tx for Receive Swap {id} was already broadcast, ignoring swap: txid {mrh_tx_id}"
));
}
// looking for lockup script history to verify lockup was broadcasted and confirmed
if let Err(e) = self
.verify_lockup_tx(&receive_swap, &transaction, true)
@@ -181,7 +203,7 @@ impl ReceiveSwapHandler {
warn!("Claim tx for Receive Swap {id} was already broadcast: txid {claim_tx_id}")
}
None => {
self.update_swap_info(&receive_swap.id, Pending, None, None)
self.update_swap_info(&receive_swap.id, Pending, None, None, None, None)
.await?;
match self.claim(id).await {
Ok(_) => {}
@@ -215,9 +237,12 @@ impl ReceiveSwapHandler {
to_state: PaymentState,
claim_tx_id: Option<&str>,
lockup_tx_id: Option<&str>,
mrh_tx_id: Option<&str>,
mrh_amount_sat: Option<u64>,
) -> Result<(), PaymentError> {
info!(
"Transitioning Receive swap {swap_id} to {to_state:?} (claim_tx_id = {claim_tx_id:?}, lockup_tx_id = {lockup_tx_id:?})"
"Transitioning Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})",
swap_id, to_state, claim_tx_id, lockup_tx_id, mrh_tx_id
);
let swap = self
@@ -229,6 +254,7 @@ impl ReceiveSwapHandler {
})?;
let payment_id = claim_tx_id
.or(lockup_tx_id)
.or(mrh_tx_id)
.map(|id| id.to_string())
.or(swap.claim_tx_id);
@@ -238,6 +264,8 @@ impl ReceiveSwapHandler {
to_state,
claim_tx_id,
lockup_tx_id,
mrh_tx_id,
mrh_amount_sat,
)?;
if let Some(payment_id) = payment_id {
@@ -301,8 +329,15 @@ impl ReceiveSwapHandler {
)?;
info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}");
self.update_swap_info(swap_id, Pending, Some(&claim_tx_id), None)
.await
self.update_swap_info(
swap_id,
Pending,
Some(&claim_tx_id),
None,
None,
None,
)
.await
}
Err(err) => {
// Multiple attempts to broadcast have failed. Unset the swap claim_tx_id
@@ -435,7 +470,7 @@ mod tests {
storage.insert_receive_swap(&receive_swap)?;
assert!(receive_swap_state_handler
.update_swap_info(&receive_swap.id, *allowed_state, None, None)
.update_swap_info(&receive_swap.id, *allowed_state, None, None, None, None)
.await
.is_ok());
}
@@ -459,7 +494,7 @@ mod tests {
storage.insert_receive_swap(&receive_swap)?;
assert!(receive_swap_state_handler
.update_swap_info(&receive_swap.id, *disallowed_state, None, None)
.update_swap_info(&receive_swap.id, *disallowed_state, None, None, None, None)
.await
.is_err());
}

View File

@@ -149,15 +149,17 @@ impl LiquidSdk {
let fingerprint_hex: String =
Xpub::decode(signer.xpub()?.as_slice())?.identifier()[0..4].to_hex();
let working_dir = config.get_wallet_working_dir(fingerprint_hex)?;
let onchain_wallet = Arc::new(LiquidOnchainWallet::new(
signer.clone(),
config.clone(),
&working_dir,
)?);
let persister = Arc::new(Persister::new(&working_dir, config.network)?);
persister.init()?;
let onchain_wallet = Arc::new(LiquidOnchainWallet::new(
config.clone(),
&working_dir,
persister.clone(),
signer.clone(),
)?);
let event_manager = Arc::new(EventManager::new());
let (shutdown_sender, shutdown_receiver) = watch::channel::<()>(());
@@ -494,11 +496,6 @@ impl LiquidSdk {
/// Get the wallet info, calculating the current pending and confirmed balances.
pub async fn get_info(&self) -> Result<GetInfoResponse> {
self.ensure_is_started().await?;
debug!(
"next_unused_address: {}",
self.onchain_wallet.next_unused_address().await?
);
let mut pending_send_sat = 0;
let mut pending_receive_sat = 0;
let mut confirmed_sent_sat = 0;
@@ -1316,12 +1313,11 @@ impl LiquidSdk {
let accept_zero_conf = server_lockup_amount_sat <= pair.limits.maximal_zero_conf;
let payer_amount_sat = req.prepare_response.total_fees_sat + receiver_amount_sat;
let claim_address = req.address.clone();
let swap = ChainSwap {
id: swap_id.clone(),
direction: Direction::Outgoing,
claim_address,
claim_address: Some(req.address.clone()),
lockup_address: create_response.lockup_details.lockup_address,
timeout_block_height: create_response.lockup_details.timeout_block_height,
preimage: preimage_str,
@@ -1608,6 +1604,12 @@ impl LiquidSdk {
};
let create_response = self.swapper.create_receive_swap(v2_req)?;
// Reserve this address until the timeout block height
self.persister.insert_or_update_reserved_address(
&mrh_addr_str,
create_response.timeout_block_height,
)?;
// Check if correct MRH was added to the invoice by Boltz
let (bip21_lbtc_address, _bip21_amount_btc) = self
.swapper
@@ -1658,6 +1660,10 @@ impl LiquidSdk {
receiver_amount_sat,
claim_fees_sat: reverse_pair.fees.claim_estimate(),
claim_tx_id: None,
lockup_tx_id: None,
mrh_address: mrh_addr_str,
mrh_script_pubkey: mrh_addr.to_unconfidential().script_pubkey().to_hex(),
mrh_tx_id: None,
created_at: utils::now(),
state: PaymentState::Created,
})
@@ -1724,12 +1730,11 @@ impl LiquidSdk {
let accept_zero_conf = user_lockup_amount_sat <= pair.limits.maximal_zero_conf;
let receiver_amount_sat = user_lockup_amount_sat - fees_sat;
let claim_address = self.onchain_wallet.next_unused_address().await?.to_string();
let swap = ChainSwap {
id: swap_id.clone(),
direction: Direction::Incoming,
claim_address,
claim_address: None,
lockup_address: create_response.lockup_details.lockup_address,
timeout_block_height: create_response.lockup_details.timeout_block_height,
preimage: preimage_str,
@@ -1970,6 +1975,9 @@ impl LiquidSdk {
let pending_receive_swaps_by_claim_tx_id =
self.persister.list_pending_receive_swaps_by_claim_tx_id()?;
let ongoing_receive_swaps_by_mrh_script_pubkey = self
.persister
.list_ongoing_receive_swaps_by_mrh_script_pubkey()?;
let pending_send_swaps_by_refund_tx_id =
self.persister.list_pending_send_swaps_by_refund_tx_id()?;
let pending_chain_swaps_by_claim_tx_id =
@@ -1989,6 +1997,12 @@ impl LiquidSdk {
let tx_id = tx.txid.to_string();
let is_tx_confirmed = tx.height.is_some();
let amount_sat = tx.balance.values().sum::<i64>();
let maybe_script_pubkey = tx
.outputs
.iter()
.find(|output| output.is_some())
.and_then(|output| output.clone().map(|o| o.script_pubkey.to_hex()));
let mrh_script_pubkey = maybe_script_pubkey.clone().unwrap_or_default();
self.persister.insert_or_update_payment(
PaymentTxData {
@@ -2002,19 +2016,36 @@ impl LiquidSdk {
},
is_confirmed: is_tx_confirmed,
},
match tx.outputs.iter().find(|output| output.is_some()) {
Some(Some(output)) => Some(output.script_pubkey.to_hex()),
_ => None,
},
maybe_script_pubkey,
None,
)?;
if let Some(swap) = pending_receive_swaps_by_claim_tx_id.get(&tx_id) {
if is_tx_confirmed {
self.receive_swap_handler
.update_swap_info(&swap.id, Complete, None, None)
.update_swap_info(&swap.id, Complete, None, None, None, None)
.await?;
}
} else if let Some(swap) =
ongoing_receive_swaps_by_mrh_script_pubkey.get(&mrh_script_pubkey)
{
// Update the swap status according to the MRH tx confirmation state
let to_state = match is_tx_confirmed {
true => Complete,
false => Pending,
};
self.receive_swap_handler
.update_swap_info(
&swap.id,
to_state,
None,
None,
Some(&tx_id),
Some(amount_sat.unsigned_abs()),
)
.await?;
// Remove the used MRH address from the reserved addresses
self.persister.delete_reserved_address(&swap.mrh_address)?;
} else if let Some(swap) = pending_send_swaps_by_refund_tx_id.get(&tx_id) {
if is_tx_confirmed {
self.send_swap_handler
@@ -2034,8 +2065,7 @@ impl LiquidSdk {
.await?;
}
} else {
// Payments that are not directly associated with a swap (e.g. direct onchain payments using MRH)
// Payments that are not directly associated with a swap
match payments_before_sync.get(&tx_id) {
None => {
// A completely new payment brought in by this sync, in mempool or confirmed

View File

@@ -101,12 +101,13 @@ impl BoltzSwapper {
pub(crate) fn new_outgoing_chain_claim_tx(
&self,
swap: &ChainSwap,
claim_address: String,
) -> Result<Transaction, PaymentError> {
let claim_keypair = swap.get_claim_keypair()?;
let claim_swap_script = swap.get_claim_swap_script()?.as_bitcoin_script()?;
let claim_tx_wrapper = BtcSwapTx::new_claim(
claim_swap_script,
swap.claim_address.clone(),
claim_address,
&self.bitcoin_electrum_config,
self.boltz_url.clone(),
swap.id.clone(),

View File

@@ -72,12 +72,13 @@ impl BoltzSwapper {
pub(crate) fn new_incoming_chain_claim_tx(
&self,
swap: &ChainSwap,
claim_address: String,
) -> Result<Transaction, PaymentError> {
let claim_keypair = swap.get_claim_keypair()?;
let swap_script = swap.get_claim_swap_script()?.as_liquid_script()?;
let claim_tx_wrapper = LBtcSwapTx::new_claim(
swap_script,
swap.claim_address.clone(),
claim_address,
&self.liquid_electrum_config,
self.boltz_url.clone(),
swap.id.clone(),

View File

@@ -252,12 +252,24 @@ impl Swapper for BoltzSwapper {
claim_address: Option<String>,
) -> Result<Transaction, PaymentError> {
let tx = match &swap {
Swap::Chain(swap) => match swap.direction {
Direction::Incoming => Transaction::Liquid(self.new_incoming_chain_claim_tx(swap)?),
Direction::Outgoing => {
Transaction::Bitcoin(self.new_outgoing_chain_claim_tx(swap)?)
Swap::Chain(swap) => {
let Some(claim_address) = claim_address else {
return Err(PaymentError::Generic {
err: format!(
"No claim address was supplied when claiming for Chain swap {}",
swap.id
),
});
};
match swap.direction {
Direction::Incoming => {
Transaction::Liquid(self.new_incoming_chain_claim_tx(swap, claim_address)?)
}
Direction::Outgoing => {
Transaction::Bitcoin(self.new_outgoing_chain_claim_tx(swap, claim_address)?)
}
}
},
}
Swap::Receive(swap) => {
let Some(claim_address) = claim_address else {
return Err(PaymentError::Generic {

View File

@@ -53,7 +53,7 @@ pub(crate) fn new_chain_swap(
Direction::Incoming => ChainSwap {
id: generate_random_string(4),
direction,
claim_address: "tlq1qq0nn497zr4l6nfq84pxzqwme87n7kz09lvnx94t7ecw045dvjr09s9s6ens46nt7qcrmx673vq6gkss50qhpcxywt3r5a44j2".to_string(),
claim_address: None,
lockup_address: "tb1p7cftn5u3ndt8ln0m6hruwyhsz8kc5sxt557ua03qcew0z29u5paqh8f7uu".to_string(),
timeout_block_height: 2868778,
preimage: "bbce422d96c0386c3a6c1b1fe11fc7be3fdd871c6855db6ab2e319e96ec19c78".to_string(),
@@ -116,7 +116,7 @@ pub(crate) fn new_chain_swap(
Direction::Outgoing => ChainSwap {
id: generate_random_string(4),
direction,
claim_address: "14DeLtifrayJXAWft3qhPbdY4HVJUgMyx1".to_string(),
claim_address: Some("14DeLtifrayJXAWft3qhPbdY4HVJUgMyx1".to_string()),
lockup_address: "tlq1pqg4e5r5a59gdl26ud6s7gna3mchqs20ycwl2lp67ejzy69fl7dwccwx9nqtr6ef848k7vpmvmdhsyeq2wp3vtn3gnlenhd0wrasv4qvr2dk0nz5tu0rw".to_string(),
timeout_block_height: 1481523,
preimage: "a95a028483df6112c15fdef513d9d8255ff0951d5c0856f85cf9c98352a0f71a".to_string(),

View File

@@ -103,6 +103,10 @@ pub(crate) fn new_receive_swap(payment_state: Option<PaymentState>) -> ReceiveSw
receiver_amount_sat: 587,
claim_fees_sat: 200,
claim_tx_id: None,
lockup_tx_id: None,
mrh_address: "tlq1pq2amlulhea6ltq7x3eu9atsc2nnrer7yt7xve363zxedqwu2mk6ctcyv9awl8xf28cythreqklt5q0qqwsxzlm6wu4z6d574adl9zh2zmr0h85gt534n".to_string(),
mrh_script_pubkey: "tex1qnkznyyxwnxnkk0j94cnvq27h24jk6sqf0te55x".to_string(),
mrh_tx_id: None,
created_at: utils::now(),
state: payment_state.unwrap_or(PaymentState::Created),
}

View File

@@ -4,6 +4,7 @@ use std::{str::FromStr, sync::Arc};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use boltz_client::ElementsAddress;
use log::debug;
use lwk_common::Signer as LwkSigner;
use lwk_common::{singlesig_desc, Singlesig};
use lwk_wollet::{
@@ -17,6 +18,7 @@ use sdk_common::lightning::util::message_signing::verify;
use tokio::sync::Mutex;
use crate::model::Signer;
use crate::persist::Persister;
use crate::signer::SdkLwkSigner;
use crate::{
ensure_sdk,
@@ -79,16 +81,18 @@ pub trait OnchainWallet: Send + Sync {
}
pub(crate) struct LiquidOnchainWallet {
wallet: Arc<Mutex<Wollet>>,
config: Config,
persister: Arc<Persister>,
wallet: Arc<Mutex<Wollet>>,
pub(crate) signer: SdkLwkSigner,
}
impl LiquidOnchainWallet {
pub(crate) fn new(
user_signer: Arc<Box<dyn Signer>>,
config: Config,
working_dir: &String,
persister: Arc<Persister>,
user_signer: Arc<Box<dyn Signer>>,
) -> Result<Self> {
let signer = crate::signer::SdkLwkSigner::new(user_signer)?;
let descriptor = LiquidOnchainWallet::get_descriptor(&signer, config.network)?;
@@ -97,9 +101,10 @@ impl LiquidOnchainWallet {
let lwk_persister = FsPersister::new(working_dir, elements_network, &descriptor)?;
let wollet = Wollet::new(elements_network, lwk_persister, descriptor)?;
Ok(Self {
config,
persister,
wallet: Arc::new(Mutex::new(wollet)),
signer,
config,
})
}
@@ -206,7 +211,33 @@ impl OnchainWallet for LiquidOnchainWallet {
/// Get the next unused address in the wallet
async fn next_unused_address(&self) -> Result<Address, PaymentError> {
Ok(self.wallet.lock().await.address(None)?.address().clone())
let tip = self.tip().await.height();
let address = match self.persister.next_expired_reserved_address(tip)? {
Some(reserved_address) => {
debug!(
"Got reserved address {} that expired on block height {}",
reserved_address.address, reserved_address.expiry_block_height
);
ElementsAddress::from_str(&reserved_address.address)
.map_err(|e| PaymentError::Generic { err: e.to_string() })?
}
None => {
let next_index = self.persister.next_derivation_index()?;
let address_result = self.wallet.lock().await.address(next_index)?;
let address = address_result.address().clone();
let index = address_result.index();
debug!(
"Got unused address {} with derivation index {}",
address, index
);
if next_index.is_none() {
self.persister.set_last_derivation_index(index)?;
}
address
}
};
Ok(address)
}
/// Get the current tip of the blockchain the wallet is aware of
@@ -232,7 +263,15 @@ impl OnchainWallet for LiquidOnchainWallet {
true,
true,
))?;
lwk_wollet::full_scan_with_electrum_client(&mut wallet, &mut electrum_client)?;
let index = self
.persister
.get_last_derivation_index()?
.unwrap_or_default();
lwk_wollet::full_scan_to_index_with_electrum_client(
&mut wallet,
index,
&mut electrum_client,
)?;
Ok(())
}
@@ -259,6 +298,7 @@ mod tests {
use super::*;
use crate::model::Config;
use crate::signer::SdkSigner;
use crate::test_utils::persist::new_persister;
use crate::wallet::LiquidOnchainWallet;
use tempfile::TempDir;
@@ -274,8 +314,12 @@ mod tests {
let temp_dir = TempDir::new().unwrap();
let working_dir = temp_dir.path().to_str().unwrap().to_string();
let wallet: Arc<dyn OnchainWallet> =
Arc::new(LiquidOnchainWallet::new(sdk_signer.clone(), config, &working_dir).unwrap());
let (_temp_dir, storage) = new_persister().unwrap();
let storage = Arc::new(storage);
let wallet: Arc<dyn OnchainWallet> = Arc::new(
LiquidOnchainWallet::new(config, &working_dir, storage, sdk_signer.clone()).unwrap(),
);
// Test message
let message = "Hello, Liquid!";