This commit is contained in:
irriden
2024-02-13 16:54:11 +00:00
parent 00d495b84c
commit 7ab755c670
4 changed files with 44 additions and 46 deletions

View File

@@ -7,8 +7,6 @@ use std::sync::Mutex;
pub static CONNS: Lazy<Mutex<Connections>> = Lazy::new(|| Mutex::new(Connections::new()));
pub static HSMD_INIT: Mutex<Vec<u8>> = Mutex::new(Vec::new());
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Connections {
pub pubkey: Option<String>,

View File

@@ -1,6 +1,4 @@
use crate::bitcoin::blockdata::constants::ChainHash;
use crate::bitcoin::Network;
use crate::conn::{ChannelRequest, LssReq, HSMD_INIT};
use crate::conn::{ChannelRequest, LssReq};
use crate::handle::handle_message;
use crate::secp256k1::PublicKey;
use log::*;
@@ -77,16 +75,16 @@ impl<C: 'static + Client> SignerLoop<C> {
}
/// Start the read loop
pub fn start(&mut self, network: Option<Network>) {
pub fn start(&mut self) {
info!("loop {}: start", self.log_prefix);
match self.do_loop(network) {
match self.do_loop() {
Ok(()) => info!("loop {}: done", self.log_prefix),
Err(Error::Eof) => info!("loop {}: ending", self.log_prefix),
Err(e) => error!("loop {}: error {:?}", self.log_prefix, e),
}
}
fn do_loop(&mut self, network: Option<Network>) -> Result<()> {
fn do_loop(&mut self) -> Result<()> {
loop {
let raw_msg = self.client.read_raw()?;
// debug!("loop {}: got raw", self.log_prefix);
@@ -108,7 +106,7 @@ impl<C: 'static + Client> SignerLoop<C> {
self.vls_tx.clone(),
client_id,
);
thread::spawn(move || new_loop.start(None));
thread::spawn(move || new_loop.start());
}
Message::Memleak(_) => {
// info!("Memleak");
@@ -116,20 +114,8 @@ impl<C: 'static + Client> SignerLoop<C> {
self.client.write(reply)?;
}
msg => {
if let Message::HsmdInit(ref m) = msg {
if let Some(net) = network {
if ChainHash::using_genesis_block(net).as_bytes()
!= m.chain_params.as_ref()
{
panic!("The network settings of CLN and broker don't match!");
}
} else {
log::error!("No Network provided");
}
let mut hsmd_raw = HSMD_INIT.lock().unwrap();
*hsmd_raw = raw_msg;
drop(hsmd_raw);
continue;
if let Message::HsmdInit(ref _m) = msg {
panic!("HsmdInit should have been handled already!");
}
// check if we got the same preapprove message less than PREAPPROVE_CACHE_TTL seconds ago
if let Message::PreapproveInvoice(_) | Message::PreapproveKeysend(_) = msg {

View File

@@ -1,4 +1,3 @@
use crate::conn::HSMD_INIT;
use crate::conn::{ChannelRequest, LssReq};
use anyhow::{anyhow, Result};
use lss_connector::{InitResponse, LssBroker, Response, SignerMutations};
@@ -17,12 +16,13 @@ pub fn lss_tasks(
mut conn_rx: mpsc::Receiver<(String, oneshot::Sender<bool>)>,
init_tx: mpsc::Sender<ChannelRequest>,
mut cln_client: UnixClient,
mut hsmd_raw: Vec<u8>,
) {
tokio::task::spawn(async move {
// first connection - initializes lssbroker
let (lss_conn, hsmd_init_reply) = loop {
let (cid, dance_complete_tx) = conn_rx.recv().await.unwrap();
match try_dance(&cid, &uri, None, &init_tx, dance_complete_tx).await {
match try_dance(&cid, &uri, None, &init_tx, dance_complete_tx, &mut hsmd_raw).await {
Some(ret) => break ret,
None => log::warn!("broker not initialized, try connecting again..."),
}
@@ -32,7 +32,15 @@ pub fn lss_tasks(
// connect handler for all subsequent connections
while let Some((cid, dance_complete_tx)) = conn_rx.recv().await {
log::info!("CLIENT {} connected!", cid);
let _ = try_dance(&cid, &uri, Some(&lss_conn), &init_tx, dance_complete_tx).await;
let _ = try_dance(
&cid,
&uri,
Some(&lss_conn),
&init_tx,
dance_complete_tx,
&mut hsmd_raw,
)
.await;
}
});
}
@@ -58,8 +66,9 @@ async fn try_dance(
lss_conn: Option<&LssBroker>,
init_tx: &mpsc::Sender<ChannelRequest>,
dance_complete_tx: std_oneshot::Sender<bool>,
hsmd_raw: &mut Vec<u8>,
) -> Option<(LssBroker, Vec<u8>)> {
match connect_dance(cid, uri, lss_conn, init_tx).await {
match connect_dance(cid, uri, lss_conn, init_tx, hsmd_raw).await {
Ok(ret) => {
let _ = dance_complete_tx.send(true);
// none if lss_conn is some, some otherwise
@@ -78,13 +87,14 @@ async fn connect_dance(
uri: &str,
lss_conn_opt: Option<&LssBroker>,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
hsmd_raw: &mut Vec<u8>,
) -> Result<Option<(LssBroker, Vec<u8>)>> {
let (new_broker, ir) = dance_step_1(cid, uri, lss_conn_opt, mqtt_tx).await?;
let lss_conn = new_broker.as_ref().xor(lss_conn_opt).ok_or(anyhow!(
"should never happen, either we use the newly initialized, or the one passed in"
))?;
dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?;
let hsmd_init_reply = dance_step_3(cid, mqtt_tx).await?;
let hsmd_init_reply = dance_step_3(cid, mqtt_tx, hsmd_raw).await?;
// only some when lss_conn_opt is none
Ok(new_broker.map(|broker| (broker, hsmd_init_reply)))
}
@@ -123,19 +133,15 @@ async fn dance_step_2(
Ok(())
}
async fn dance_step_3(cid: &str, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
let (hsmd_raw, mut hsmd_init) = loop {
let hsmd_raw = HSMD_INIT.lock().unwrap().clone();
if hsmd_raw.is_empty() {
continue;
}
if let Message::HsmdInit(hsmd_init) = msgs::from_vec(hsmd_raw.clone()).unwrap() {
break (hsmd_raw, hsmd_init);
} else {
panic!("Not a hsmd init message");
}
async fn dance_step_3(
cid: &str,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
hsmd_raw: &mut Vec<u8>,
) -> Result<Vec<u8>> {
let Message::HsmdInit(mut hsmd_init) = msgs::from_vec(hsmd_raw.clone()).unwrap() else {
panic!("Expected a hsmd init message here")
};
let hsmd_init_bytes = parser::raw_request_from_bytes(hsmd_raw, 0, [0u8; 33], 0)?;
let hsmd_init_bytes = parser::raw_request_from_bytes(hsmd_raw.clone(), 0, [0u8; 33], 0)?;
let reply = ChannelRequest::send(cid, topics::INIT_3_MSG, hsmd_init_bytes, mqtt_tx).await?;
if reply.is_empty() {
return Err(anyhow!("Hsmd init failed !"));
@@ -145,9 +151,7 @@ async fn dance_step_3(cid: &str, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Resu
match msgs::from_vec(hsmd_init_reply.clone()) {
Ok(Message::HsmdInitReplyV4(hir)) => {
hsmd_init.hsm_wire_max_version = hir.hsm_version;
let mut hsmd_raw = HSMD_INIT.lock().unwrap();
*hsmd_raw = hsmd_init.as_vec();
drop(hsmd_raw);
}
_ => panic!("Not a hsmd init reply v4"),
};

View File

@@ -11,6 +11,7 @@ mod util;
pub(crate) use sphinx_signer::lightning_signer::bitcoin::{self, secp256k1};
use crate::bitcoin::blockdata::constants::ChainHash;
use crate::chain_tracker::MqttSignerPort;
use crate::conn::{conns_set_pubkey, current_pubkey, new_connection, ChannelRequest, LssReq};
use crate::looper::SignerLoop;
@@ -26,7 +27,8 @@ use std::env;
use std::sync::Arc;
use url::Url;
use vls_frontend::{frontend::SourceFactory, Frontend};
use vls_proxy::client::UnixClient;
use vls_protocol::{msgs, msgs::Message};
use vls_proxy::client::{Client, UnixClient};
use vls_proxy::connection::{open_parent_fd, UnixConnection};
use vls_proxy::portfront::SignerPortFront;
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
@@ -78,12 +80,20 @@ fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
broker_setup(settings, mqtt_rx, init_rx, conn_tx, error_tx.clone());
let cln_client_a = UnixClient::new(UnixConnection::new(parent_fd));
let mut cln_client_a = UnixClient::new(UnixConnection::new(parent_fd));
let hsmd_raw = cln_client_a.read_raw().unwrap();
let msg = msgs::from_vec(hsmd_raw.clone()).unwrap();
let Message::HsmdInit(ref m) = msg else {
panic!("Expected a hsmd init message first");
};
if ChainHash::using_genesis_block(settings.network).as_bytes() != m.chain_params.as_ref() {
panic!("The network settings of CLN and broker don't match!");
}
let (lss_tx, lss_rx) = mpsc::channel::<LssReq>(10000);
// TODO: add a validation here of the uri setting to make sure LSS is running
if let Ok(lss_uri) = env::var("VLS_LSS") {
log::info!("Spawning lss tasks...");
lss::lss_tasks(lss_uri, lss_rx, conn_rx, init_tx, cln_client_a);
lss::lss_tasks(lss_uri, lss_rx, conn_rx, init_tx, cln_client_a, hsmd_raw);
} else {
log::warn!("running without LSS");
}
@@ -116,7 +126,7 @@ fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let mut signer_loop = SignerLoop::new(cln_client, mqtt_tx.clone(), lss_tx);
// spawn CLN listener
std::thread::spawn(move || {
signer_loop.start(Some(settings.network));
signer_loop.start();
});
routes::launch_rocket(mqtt_tx, error_tx, settings)