From b8afe2267599425c10b8086a065eb045aad38127 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Wed, 31 May 2023 10:44:39 -0700 Subject: [PATCH] broker LSS integration, init, handle vls muts, connect to lss grpc, better chan msgs --- broker/Cargo.lock | 18 +++++--- broker/Cargo.toml | 4 +- broker/lss.md | 6 +++ broker/src/conn.rs | 16 +++++++ broker/src/{unix_fd.rs => looper.rs} | 67 ++++++++++++++++++++-------- broker/src/main.rs | 66 +++++++++++++++++++++------ broker/src/mqtt.rs | 30 +++++-------- broker/src/run_test.rs | 1 + sphinx-key/src/core/events.rs | 11 ++++- tester/src/main.rs | 3 +- 10 files changed, 162 insertions(+), 60 deletions(-) rename broker/src/{unix_fd.rs => looper.rs} (73%) diff --git a/broker/Cargo.lock b/broker/Cargo.lock index c47ba8c..720dacd 100644 --- a/broker/Cargo.lock +++ b/broker/Cargo.lock @@ -1713,15 +1713,14 @@ dependencies = [ [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs#5623e8845fcd75c61be877d7e6285a3036bab1cb" dependencies = [ "anyhow", - "hex", "lightning-storage-server", "log", "rmp-serde", "secp256k1", "serde", + "serde-big-array", "tokio", "vls-frontend", "vls-protocol-signer", @@ -2956,6 +2955,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde_bolt" version = "0.2.4" @@ -3164,7 +3172,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs#5623e8845fcd75c61be877d7e6285a3036bab1cb" +source = "git+https://github.com/stakwork/sphinx-rs#edaae2c7c3a0839b63ebc6e88c6c135960686191" dependencies = [ "anyhow", "base64 0.13.1", @@ -3176,7 +3184,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs#5623e8845fcd75c61be877d7e6285a3036bab1cb" +source = "git+https://github.com/stakwork/sphinx-rs#edaae2c7c3a0839b63ebc6e88c6c135960686191" dependencies = [ "anyhow", "hex", @@ -3222,7 +3230,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs#5623e8845fcd75c61be877d7e6285a3036bab1cb" +source = "git+https://github.com/stakwork/sphinx-rs#edaae2c7c3a0839b63ebc6e88c6c135960686191" dependencies = [ "anyhow", "bip39", diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 8c8b041..c8aad5f 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -9,8 +9,8 @@ strip = "debuginfo" [dependencies] sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs" } -lss-connector = { git = "https://github.com/stakwork/sphinx-rs" } -# lss-connector = { path = "../../sphinx-rs/lss-connector" } +# lss-connector = { git = "https://github.com/stakwork/sphinx-rs" } +lss-connector = { path = "../../sphinx-rs/lss-connector" } # sphinx-key-parser = { path = "../parser" } vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "e13c8cd994b310f598c0b2902741d89ad5472382" } vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "e13c8cd994b310f598c0b2902741d89ad5472382" } diff --git a/broker/lss.md b/broker/lss.md index 051f671..d6322f1 100644 --- a/broker/lss.md +++ b/broker/lss.md @@ -21,6 +21,7 @@ - create Auth - LssClient::new +- get ALL muts from cloud - let (muts, server_hmac) = client.get("".to_string(), &nonce) - send the muts and server_hmac to signer @@ -37,8 +38,13 @@ ##### broker - store the muts using the LssClient (client.put(muts, &client_hmac)) +- send server_hmac back to signer??? - init the Unix Fd connection finally, so the hsmd_init message comes +##### signer + +- need to verify server hmac here??? + ### VLS ##### signer diff --git a/broker/src/conn.rs b/broker/src/conn.rs index 20c6062..baf2894 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -80,5 +80,21 @@ impl ChannelRequest { // mpsc reply #[derive(Debug)] pub struct ChannelReply { + // the return topic + pub topic: String, pub reply: Vec, } + +/// Responses are received on the oneshot sender +#[derive(Debug)] +pub struct LssReq { + pub message: Vec, + pub reply_tx: oneshot::Sender>, +} +impl LssReq { + pub fn new(message: Vec) -> (Self, oneshot::Receiver>) { + let (reply_tx, reply_rx) = oneshot::channel(); + let cr = Self { message, reply_tx }; + (cr, reply_rx) + } +} diff --git a/broker/src/unix_fd.rs b/broker/src/looper.rs similarity index 73% rename from broker/src/unix_fd.rs rename to broker/src/looper.rs index a0df152..3b91f7a 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/looper.rs @@ -1,8 +1,8 @@ -use crate::conn::{Channel, ChannelReply, ChannelRequest}; +use crate::conn::{Channel, ChannelRequest, LssReq}; use crate::util::Settings; use bitcoin::blockdata::constants::ChainHash; use log::*; -use rocket::tokio::sync::{mpsc, oneshot}; +use rocket::tokio::sync::mpsc; use secp256k1::PublicKey; use sphinx_signer::{parser, sphinx_glyph::topics}; use std::thread; @@ -31,23 +31,30 @@ pub struct SignerLoop { log_prefix: String, chan: Channel, client_id: Option, + lss_tx: mpsc::Sender, } impl SignerLoop { /// Create a loop for the root (lightningd) connection, but doesn't start it yet - pub fn new(client: C, sender: mpsc::Sender) -> Self { + pub fn new( + client: C, + lss_tx: mpsc::Sender, + sender: mpsc::Sender, + ) -> Self { let log_prefix = format!("{}/{}", std::process::id(), client.id()); Self { client, log_prefix, chan: Channel::new(sender), client_id: None, + lss_tx, } } // Create a loop for a non-root connection fn new_for_client( client: C, + lss_tx: mpsc::Sender, sender: mpsc::Sender, client_id: ClientId, ) -> Self { @@ -57,6 +64,7 @@ impl SignerLoop { log_prefix, chan: Channel::new(sender), client_id: Some(client_id), + lss_tx, } } @@ -86,8 +94,12 @@ impl SignerLoop { peer_id, dbid: m.dbid, }; - let mut new_loop = - SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id); + let mut new_loop = SignerLoop::new_for_client( + new_client, + self.lss_tx.clone(), + self.chan.sender.clone(), + client_id, + ); thread::spawn(move || new_loop.start(None)); } Message::Memleak(_) => { @@ -110,9 +122,8 @@ impl SignerLoop { } } let reply = self.handle_message(raw_msg, catch_init)?; - // Write the reply to the node + // Write the reply to CLN self.client.write_vec(reply)?; - // info!("replied {}", self.log_prefix); } } } @@ -126,10 +137,18 @@ impl SignerLoop { .map(|c| c.peer_id.serialize()) .unwrap_or([0u8; 33]); let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?; - // send to glyph - let reply_rx = self.send_request(md)?; - let res = self.get_reply(reply_rx)?; - let reply = parser::raw_response_from_bytes(res, self.chan.sequence)?; + // send to signer + log::info!("SEND ON {}", topics::VLS); + let (_res_topic, res) = self.send_request_and_get_reply(topics::VLS, md)?; + // send reply to LSS to store muts + log::info!("GOT ON {}", _res_topic); + let lss_reply = self.send_lss_and_get_reply(res)?; + // send to signer for HMAC validation, and get final reply + log::info!("SEND ON {}", topics::LSS_MSG); + let (_res_topic, res2) = self.send_request_and_get_reply(topics::LSS_MSG, lss_reply)?; + // create reply for CLN + log::info!("GOT ON {}, send to CLN", _res_topic); + let reply = parser::raw_response_from_bytes(res2, self.chan.sequence)?; // add to the sequence self.chan.sequence = self.chan.sequence.wrapping_add(1); // catch the pubkey if its the first one connection @@ -154,21 +173,31 @@ impl SignerLoop { Ok(()) } - fn send_request(&mut self, message: Vec) -> Result> { + // returns (topic, payload) + // might halt if signer is offline + fn send_request_and_get_reply( + &mut self, + topic: &str, + message: Vec, + ) -> Result<(String, Vec)> { // Send a request to the MQTT handler to send to signer - let (request, reply_rx) = ChannelRequest::new(topics::VLS, message); + let (request, reply_rx) = ChannelRequest::new(topic, message); // This can fail if MQTT shuts down self.chan .sender .blocking_send(request) .map_err(|_| Error::Eof)?; - Ok(reply_rx) + let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; + + Ok((reply.topic, reply.reply)) } - fn get_reply(&mut self, reply_rx: oneshot::Receiver) -> Result> { - // Wait for the signer reply - // Can fail if MQTT shuts down - let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; - Ok(reply.reply) + fn send_lss_and_get_reply(&mut self, message: Vec) -> Result> { + // Send a request to the MQTT handler to send to signer + let (request, reply_rx) = LssReq::new(message); + // This can fail if MQTT shuts down + self.lss_tx.blocking_send(request).map_err(|_| Error::Eof)?; + let res = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; + Ok(res) } } diff --git a/broker/src/main.rs b/broker/src/main.rs index ee3d0b9..30f489f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -4,15 +4,16 @@ mod error_log; mod mqtt; mod routes; mod run_test; -mod unix_fd; +mod looper; mod util; mod conn; -use crate::conn::{Connections, ChannelRequest}; +use crate::conn::{Connections, ChannelRequest, LssReq}; use crate::chain_tracker::MqttSignerPort; use crate::mqtt::{check_auth, start_broker}; -use crate::unix_fd::SignerLoop; +use crate::looper::SignerLoop; use crate::util::{read_broker_config, Settings}; +use anyhow::Result; use clap::{arg, App}; use rocket::tokio::{ self, @@ -27,7 +28,9 @@ use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; use vls_proxy::portfront::SignerPortFront; use vls_proxy::util::{add_hsmd_args, handle_hsmd_version}; -use lss_connector::LssBroker; +use lss_connector::{LssBroker, Response, lss_handle}; +use sphinx_signer::sphinx_glyph::topics; + #[rocket::launch] async fn rocket() -> _ { @@ -73,8 +76,20 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (error_tx, error_rx) = broadcast::channel(10000); error_log::log_errors(error_rx); + // waits until first connection let conns = broker_setup(settings, mqtt_rx, error_tx.clone()).await; + let (lss_tx, lss_rx) = mpsc::channel(10000); + let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") { + // waits until LSS confirmation from signer + let lss_broker = lss_setup(&lss_uri, lss_rx, mqtt_tx.clone()).await.unwrap(); + log::info!("=> lss broker connection created!"); + Some(lss_broker) + } else { + log::warn!("running without LSS"); + None + }; + if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { let signer_port = Box::new(MqttSignerPort::new(mqtt_tx.clone())); let port_front = SignerPortFront::new(signer_port, settings.network); @@ -90,9 +105,13 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { } else { log::warn!("Running without a frontend") } + + // test sleep FIXME + // tokio::time::sleep(std::time::Duration::from_secs(10)).await; + let cln_client = UnixClient::new(UnixConnection::new(parent_fd)); // TODO pass status_rx into SignerLoop? - let mut signer_loop = SignerLoop::new(cln_client, mqtt_tx.clone()); + let mut signer_loop = SignerLoop::new(cln_client, lss_tx.clone(), mqtt_tx.clone()); // spawn CLN listener std::thread::spawn(move || { signer_loop.start(Some(settings)); @@ -101,6 +120,35 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { routes::launch_rocket(mqtt_tx, error_tx, settings, conns) } +pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver, mqtt_tx: mpsc::Sender) -> Result { + + // LSS required + let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?; + let (req1, reply_rx) = ChannelRequest::new(topics::LSS_MSG, msg_bytes); + let _ = mqtt_tx.send(req1).await; + let first_lss_response = reply_rx.await?; + + let ir = Response::from_slice(&first_lss_response.reply)?.as_init()?; + + let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?; + let (req2, reply_rx2) = ChannelRequest::new(topics::LSS_MSG, msg_bytes2); + let _ = mqtt_tx.send(req2).await; + let created_res = reply_rx2.await?; + let cr = Response::from_slice(&created_res.reply)?.as_created()?; + + lss_conn.handle(Response::Created(cr)).await?; + + let persister = lss_conn.persister(); + tokio::task::spawn(async move{ + while let Some(req) = lss_rx.recv().await { + let msg = lss_handle(&persister, &req.message).await.unwrap(); + let _ = req.reply_tx.send(msg); + } + }); + + Ok(lss_conn) +} + // blocks until a connection received pub async fn broker_setup( settings: Settings, @@ -123,14 +171,6 @@ pub async fn broker_setup( } }); - // LSS - let lss_client = if let Ok(uri) = env::var("VLS_LSS") { - let lss_conn = LssBroker::new(uri.clone()).await.unwrap(); - Some(lss_conn) - } else { - None - }; - // broker log::info!("=> start broker on network: {}", settings.network); start_broker( diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 4bc2fb9..af45223 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -75,10 +75,9 @@ pub fn start_broker( }); // String is the client id - let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, Vec)>(); - let (lss_tx, lss_rx) = std::sync::mpsc::channel::>(); + let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, String, Vec)>(); - // receive from CLN, Frontend, or Controller + // receive from CLN, Frontend, Controller, or LSS let conns_ = connections.clone(); let _relay_task = std::thread::spawn(move || { while let Some(msg) = receiver.blocking_recv() { @@ -88,9 +87,8 @@ pub fn start_broker( if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) { log::error!("failed to pub to link_tx! {} {:?}", cid, e); } - let rep = msg_rx.recv(); - if let Ok((cid, reply)) = rep { - if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { + if let Ok((cid, topic, reply)) = msg_rx.recv() { + if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic }) { log::warn!("could not send on reply_tx {}", cid); } } @@ -107,15 +105,15 @@ pub fn start_broker( } for client in client_list.iter() { let pub_topic = format!("{}/{}", client, msg.topic); + log::info!("SENDING TO {} on topic {}", client, msg.topic); if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) { log::error!("failed to pub to link_tx! {:?}", e); } // and receive from the correct client (or timeout to next) let dur = Duration::from_secs(9); - let rep = msg_rx.recv_timeout(dur); - if let Ok((cid, reply)) = rep { + if let Ok((cid, topic, reply)) = msg_rx.recv_timeout(dur) { if &cid == client { - if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { + if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic }) { log::warn!("could not send on reply_tx"); } break 'retry_loop; @@ -144,19 +142,15 @@ pub fn start_broker( let topic = topic_res.unwrap(); if topic.ends_with(topics::ERROR) { let _ = error_sender.send(f.publish.payload.to_vec()); - } else if topic.ends_with(topics::LSS_PUB) { - // send to LSS client here - // get the hmac back, pub to the device - if let Err(e) = lss_tx.send(f.publish.payload.to_vec()) { - log::error!("failed to pub to lss_tx! {:?}", e); - } } else { + // VLS, CONTROL, LSS let ts: Vec<&str> = topic.split("/").collect(); if ts.len() != 2 { continue; } let cid = ts[0].to_string(); - if let Err(e) = msg_tx.send((cid, f.publish.payload.to_vec())) { + let topic = ts[1].to_string(); + if let Err(e) = msg_tx.send((cid, topic, f.publish.payload.to_vec())) { log::error!("failed to pub to msg_tx! {:?}", e); } } @@ -182,11 +176,11 @@ fn subs(cid: &str, mut ltx: LinkTx) { ltx.subscribe(format!("{}/{}", cid, topics::CONTROL_RETURN)) .unwrap(); ltx.subscribe(format!("{}/{}", cid, topics::ERROR)).unwrap(); - ltx.subscribe(format!("{}/{}", cid, topics::LSS_PUB)) + ltx.subscribe(format!("{}/{}", cid, topics::LSS_RES)) .unwrap(); } -fn unsubs(cid: &str, mut ltx: LinkTx) { +fn unsubs(_cid: &str, mut _ltx: LinkTx) { // ltx.unsubscribe(format!("{}/{}", cid, topics::VLS_RETURN)) // .unwrap(); // ltx.unsubscribe(format!("{}/{}", cid, topics::CONTROL_RETURN)) diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 7eb1081..c5fb203 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -17,6 +17,7 @@ pub async fn run_test() -> rocket::Rocket { let settings = Settings::default(); let (mqtt_tx, mqtt_rx) = mpsc::channel(10000); let (error_tx, error_rx) = broadcast::channel(10000); + crate::error_log::log_errors(error_rx); // block until connection diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index b39770b..fb4e3a7 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -86,8 +86,15 @@ pub fn make_event_loop( let persister: Arc = Arc::new(FsPersister::new(&ROOT_STORE, Some(8))); // initialize the RootHandler - let root_handler = - sphinx_signer::root::init(seed, network, policy, persister).expect("failed to init signer"); + let handler_builder = sphinx_signer::root::builder(seed, network, policy, persister) + .expect("failed to init signer"); + log::info!("create root handler now"); + let (root_handler, _muts) = handler_builder.build(); + log::info!("root_handler created"); + // TODO + // wait for an Event::LssMessage of type Init + // get server_pubkey out + // and init the LSS // signing loop log::info!("=> starting the main signing loop..."); diff --git a/tester/src/main.rs b/tester/src/main.rs index acd1eba..c506029 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -105,8 +105,9 @@ async fn run_main( let seed32: [u8; 32] = seed.try_into().expect("wrong seed"); let persister: Arc = Arc::new(FsPersister::new(&store_path, None)); let policy = types::Policy::default(); - let root_handler = sphinx_signer::root::init(seed32, network, &policy, persister) + let handler_builder = sphinx_signer::root::builder(seed32, network, &policy, persister) .expect("Could not initialize root_handler"); + let (root_handler, _muts) = handler_builder.build(); // the actual handler loop loop { match eventloop.poll().await {