From 53ea8e365bd1e420a314f03c7ae9d30e9e1d1491 Mon Sep 17 00:00:00 2001 From: decentclock Date: Thu, 25 Aug 2022 17:12:36 +0000 Subject: [PATCH 1/4] Add broker config file with network and port settings --- broker/Cargo.toml | 1 + broker/config/rumqttd.conf | 28 ------------------ broker/src/main.rs | 13 ++++----- broker/src/mqtt.rs | 12 ++++++-- broker/src/run_test.rs | 9 +++++- broker/src/util.rs | 59 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 83 insertions(+), 39 deletions(-) delete mode 100644 broker/config/rumqttd.conf diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 2f8618d..246b431 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -27,6 +27,7 @@ once_cell = "1.12.0" bitcoin = "0.29.0" async-trait = "0.1" url = { version = "2.2" } +toml = "0.5.9" [features] default = ["std"] diff --git a/broker/config/rumqttd.conf b/broker/config/rumqttd.conf deleted file mode 100644 index 7908ab5..0000000 --- a/broker/config/rumqttd.conf +++ /dev/null @@ -1,28 +0,0 @@ -# Broker id. Used to identify local node of the replication mesh -id = 0 - -# A commitlog read will pull full segment. Make sure that a segment isn't -# too big as async tcp writes readiness of one connection might affect tail -# latencies of other connection. Not a problem with preempting runtimes -[router] -id = 0 -dir = "/tmp/rumqttd" -max_segment_size = 10240 -max_segment_count = 10 -max_connections = 10001 - -# Configuration of server and connections that it accepts -[servers.1] -listen = "0.0.0.0:1883" -next_connection_delay_ms = 1 - [servers.1.connections] - connection_timeout_ms = 5000 - max_client_id_len = 256 - throttle_delay_ms = 0 - max_payload_size = 5120 - max_inflight_count = 200 - max_inflight_size = 1024 - login_credentials = [ { username = "sphinx-key", password = "sphinx-key-pass" } ] - -[console] -listen = "0.0.0.0:3030" \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index 374f7db..d0a6f96 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -9,6 +9,7 @@ mod util; use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; +use crate::util::read_broker_config; use bitcoin::Network; use clap::{App, AppSettings, Arg}; use std::env; @@ -39,6 +40,8 @@ pub struct ChannelReply { pub reply: Vec, } +const BROKER_CONFIG_PATH: &str = "../broker.conf"; + fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); @@ -71,17 +74,13 @@ fn main() -> anyhow::Result<()> { return Ok(()); } - let net_var = env::var("VLS_NETWORK").unwrap_or("regtest".to_string()); - let net_var = match net_var.as_str() { - ret @ ("bitcoin" | "regtest") => ret, - _ => panic!("Please set VLS_NETWORK to either 'bitcoin' or 'regtest'"), - }; - let network = Network::from_str(net_var).unwrap(); + let settings = read_broker_config(BROKER_CONFIG_PATH); + let network = Network::from_str(settings["network"].as_str().unwrap()).unwrap(); let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); log::info!("=> start broker on network: {}", network); - let runtime = start_broker(rx, status_tx, "sphinx-1"); + let runtime = start_broker(rx, status_tx, "sphinx-1", &settings); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.blocking_recv().expect("couldnt receive"); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index e3ac1ca..ab591f7 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -11,6 +11,7 @@ use std::thread; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::timeout; +use toml::Value; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; @@ -31,8 +32,9 @@ pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, + settings: &Value, ) -> tokio::runtime::Runtime { - let config = config(); + let config = config(settings); let client_id = expected_client_id.to_string(); let (mut router, servers, builder) = async_locallink::construct(config.clone()); @@ -146,7 +148,7 @@ fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Opti } } -fn config() -> Config { +fn config(settings: &Value) -> Config { use librumqttd::rumqttlog::Config as RouterConfig; use librumqttd::{ ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, @@ -167,7 +169,11 @@ fn config() -> Config { id.to_string(), ServerSettings { cert: None, - listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), + listen: SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + settings["port"].as_integer().unwrap().try_into().unwrap(), + ) + .into(), next_connection_delay_ms: 1, connections: ConnectionSettings { connection_timeout_ms: 5000, diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 8102071..5070f2b 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -2,6 +2,8 @@ use crate::mqtt::start_broker; use crate::ChannelRequest; use sphinx_key_parser as parser; use tokio::sync::{mpsc, oneshot}; +use toml::map::Map; +use toml::Value; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; @@ -13,9 +15,14 @@ pub fn run_test() { let mut id = 0u16; let mut sequence = 1; + let mut map = Map::new(); + map.insert("port".to_string(), Value::Integer(1883)); + map.insert("network".to_string(), Value::String("regtest".to_string())); + let settings = Value::Table(map); + let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - let runtime = start_broker(rx, status_tx, CLIENT_ID); + let runtime = start_broker(rx, status_tx, CLIENT_ID, &settings); runtime.block_on(async { let mut connected = false; loop { diff --git a/broker/src/util.rs b/broker/src/util.rs index a2ae7d7..1129a33 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -1,5 +1,22 @@ use std::env; +use std::fs; use std::str::FromStr; +use toml::map::Map; +use toml::Value; + +pub fn read_broker_config(config_path: &str) -> Value { + let mut ret = Value::Table(Map::new()); + if let Ok(set) = fs::read_to_string(config_path) { + ret = Value::from_str(&set) + .expect("Couldn't read broker.conf make sure it follows the toml format"); + log::info!("Read broker.conf"); + } else { + log::info!("File broker.conf not found, using default settings"); + } + validate_network_setting(&mut ret); + validate_port_setting(&mut ret); + ret +} pub fn setup_logging(who: &str, level_arg: &str) { use fern::colors::{Color, ColoredLevelConfig}; @@ -32,3 +49,45 @@ pub fn setup_logging(who: &str, level_arg: &str) { .apply() .expect("log config"); } + +fn validate_network_setting(settings: &mut Value) { + if let None = settings.get("network") { + log::info!("Network not specified, setting to default regtest"); + settings + .as_table_mut() + .unwrap() + .insert("network".to_string(), Value::String("regtest".to_string())); + } else { + if !settings["network"].is_str() + || settings["network"].as_str().unwrap() != "bitcoin" + && settings["network"].as_str().unwrap() != "regtest" + { + panic!("The network must be set to either 'bitcoin' or 'regtest'"); + } + log::info!( + "Read network setting: {}", + settings["network"].as_str().unwrap() + ); + } +} + +fn validate_port_setting(settings: &mut Value) { + if let None = settings.get("port") { + log::info!("Broker port not specified, setting to default 1883"); + settings + .as_table_mut() + .unwrap() + .insert("port".to_string(), Value::Integer(1883)); + } else { + let temp = settings["port"] + .as_integer() + .expect("The port number is not an integer greater than 1023"); + if temp <= 1023 { + panic!("The port number is not an integer greater than 1023") + } + if temp > u16::MAX.into() { + panic!("The port number is way too big!") + } + log::info!("Read broker port setting: {}", temp); + } +} From cd465bf0845cec3bbaf87c74b72ac1f5f808d7bb Mon Sep 17 00:00:00 2001 From: decentclock Date: Fri, 26 Aug 2022 17:35:55 +0000 Subject: [PATCH 2/4] Cleanup --- broker/src/main.rs | 7 ++--- broker/src/mqtt.rs | 12 +++----- broker/src/run_test.rs | 8 ++---- broker/src/util.rs | 63 ++++++++++++++++++++++++++---------------- 4 files changed, 47 insertions(+), 43 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index d0a6f96..245d4c8 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -10,10 +10,8 @@ use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; use crate::util::read_broker_config; -use bitcoin::Network; use clap::{App, AppSettings, Arg}; use std::env; -use std::str::FromStr; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use url::Url; @@ -75,11 +73,10 @@ fn main() -> anyhow::Result<()> { } let settings = read_broker_config(BROKER_CONFIG_PATH); - let network = Network::from_str(settings["network"].as_str().unwrap()).unwrap(); let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - log::info!("=> start broker on network: {}", network); + log::info!("=> start broker on network: {}", settings.network); let runtime = start_broker(rx, status_tx, "sphinx-1", &settings); log::info!("=> wait for connected status"); // wait for connection = true @@ -93,7 +90,7 @@ fn main() -> anyhow::Result<()> { let frontend = Frontend::new( Arc::new(SignerPortFront { signer_port: Box::new(signer_port), - network, + network: settings.network, }), Url::parse(&btc_url).expect("malformed btc rpc url"), ); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index ab591f7..09a7760 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,3 +1,4 @@ +use crate::util::Settings; use crate::{ChannelReply, ChannelRequest}; use librumqttd::{ async_locallink, @@ -11,7 +12,6 @@ use std::thread; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::timeout; -use toml::Value; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; @@ -32,7 +32,7 @@ pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, - settings: &Value, + settings: &Settings, ) -> tokio::runtime::Runtime { let config = config(settings); let client_id = expected_client_id.to_string(); @@ -148,7 +148,7 @@ fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Opti } } -fn config(settings: &Value) -> Config { +fn config(settings: &Settings) -> Config { use librumqttd::rumqttlog::Config as RouterConfig; use librumqttd::{ ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, @@ -169,11 +169,7 @@ fn config(settings: &Value) -> Config { id.to_string(), ServerSettings { cert: None, - listen: SocketAddrV4::new( - Ipv4Addr::new(0, 0, 0, 0), - settings["port"].as_integer().unwrap().try_into().unwrap(), - ) - .into(), + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.port).into(), next_connection_delay_ms: 1, connections: ConnectionSettings { connection_timeout_ms: 5000, diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 5070f2b..9b354b7 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -1,9 +1,8 @@ use crate::mqtt::start_broker; +use crate::util::Settings; use crate::ChannelRequest; use sphinx_key_parser as parser; use tokio::sync::{mpsc, oneshot}; -use toml::map::Map; -use toml::Value; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; @@ -15,10 +14,7 @@ pub fn run_test() { let mut id = 0u16; let mut sequence = 1; - let mut map = Map::new(); - map.insert("port".to_string(), Value::Integer(1883)); - map.insert("network".to_string(), Value::String("regtest".to_string())); - let settings = Value::Table(map); + let settings = Settings::default(); let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); diff --git a/broker/src/util.rs b/broker/src/util.rs index 1129a33..9fd57e2 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -1,21 +1,40 @@ +use bitcoin::Network; +use std::default::Default; use std::env; use std::fs; use std::str::FromStr; -use toml::map::Map; use toml::Value; -pub fn read_broker_config(config_path: &str) -> Value { - let mut ret = Value::Table(Map::new()); +pub struct Settings { + pub network: Network, + pub port: u16, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + network: Network::Regtest, + port: 1883, + } + } +} + +pub fn read_broker_config(config_path: &str) -> Settings { + let mut settings = Settings::default(); if let Ok(set) = fs::read_to_string(config_path) { - ret = Value::from_str(&set) + let table = Value::from_str(&set) .expect("Couldn't read broker.conf make sure it follows the toml format"); log::info!("Read broker.conf"); + if let Some(network) = read_network_setting(&table) { + settings.network = network + } + if let Some(port) = read_port_setting(&table) { + settings.port = port + } } else { log::info!("File broker.conf not found, using default settings"); } - validate_network_setting(&mut ret); - validate_port_setting(&mut ret); - ret + settings } pub fn setup_logging(who: &str, level_arg: &str) { @@ -50,36 +69,31 @@ pub fn setup_logging(who: &str, level_arg: &str) { .expect("log config"); } -fn validate_network_setting(settings: &mut Value) { - if let None = settings.get("network") { +fn read_network_setting(table: &Value) -> Option { + if let None = table.get("network") { log::info!("Network not specified, setting to default regtest"); - settings - .as_table_mut() - .unwrap() - .insert("network".to_string(), Value::String("regtest".to_string())); + None } else { - if !settings["network"].is_str() - || settings["network"].as_str().unwrap() != "bitcoin" - && settings["network"].as_str().unwrap() != "regtest" + if !table["network"].is_str() + || table["network"].as_str().unwrap() != "bitcoin" + && table["network"].as_str().unwrap() != "regtest" { panic!("The network must be set to either 'bitcoin' or 'regtest'"); } log::info!( "Read network setting: {}", - settings["network"].as_str().unwrap() + table["network"].as_str().unwrap() ); + Some(Network::from_str(table["network"].as_str().unwrap()).unwrap()) } } -fn validate_port_setting(settings: &mut Value) { - if let None = settings.get("port") { +fn read_port_setting(table: &Value) -> Option { + if let None = table.get("port") { log::info!("Broker port not specified, setting to default 1883"); - settings - .as_table_mut() - .unwrap() - .insert("port".to_string(), Value::Integer(1883)); + None } else { - let temp = settings["port"] + let temp = table["port"] .as_integer() .expect("The port number is not an integer greater than 1023"); if temp <= 1023 { @@ -89,5 +103,6 @@ fn validate_port_setting(settings: &mut Value) { panic!("The port number is way too big!") } log::info!("Read broker port setting: {}", temp); + Some(temp.try_into().unwrap()) } } From 6f81516a6061b24f25ffeb0748f750faf89cbd60 Mon Sep 17 00:00:00 2001 From: decentclock Date: Fri, 26 Aug 2022 19:20:26 +0000 Subject: [PATCH 3/4] Check that broker and CLN are on the same network --- broker/src/main.rs | 2 +- broker/src/unix_fd.rs | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index 245d4c8..229cfd8 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -103,7 +103,7 @@ fn main() -> anyhow::Result<()> { let client = UnixClient::new(conn); // TODO pass status_rx into SignerLoop let mut signer_loop = SignerLoop::new(client, tx); - signer_loop.start(); + signer_loop.start(Some(&settings)); // }) Ok(()) diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 4161fa5..9adcd8f 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,4 +1,6 @@ +use crate::util::Settings; use crate::{Channel, ChannelReply, ChannelRequest}; +use bitcoin::blockdata::constants::ChainHash; use log::*; use secp256k1::PublicKey; use sphinx_key_parser as parser; @@ -58,16 +60,16 @@ impl SignerLoop { } /// Start the read loop - pub fn start(&mut self) { + pub fn start(&mut self, settings: Option<&Settings>) { info!("loop {}: start", self.log_prefix); - match self.do_loop() { + match self.do_loop(settings) { Ok(()) => info!("loop {}: done", self.log_prefix), Err(Error::Eof) => info!("loop {}: ending", self.log_prefix), Err(e) => error!("loop {}: error {:?}", self.log_prefix, e), } } - fn do_loop(&mut self) -> Result<()> { + fn do_loop(&mut self, settings: Option<&Settings>) -> Result<()> { loop { let raw_msg = self.client.read_raw()?; debug!("loop {}: got raw", self.log_prefix); @@ -85,13 +87,24 @@ impl SignerLoop { }; let mut new_loop = SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id); - thread::spawn(move || new_loop.start()); + thread::spawn(move || new_loop.start(None)); } Message::Memleak(_) => { let reply = msgs::MemleakReply { result: false }; self.client.write(reply)?; } - _ => { + msg => { + if let Message::HsmdInit(m) = msg { + if let Some(set) = settings { + if ChainHash::using_genesis_block(set.network).as_bytes() + != &m.chain_params.0 + { + panic!("The network settings of CLN and broker don't match!"); + } + } else { + panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup"); + } + } let reply = self.handle_message(raw_msg)?; // Write the reply to the node self.client.write_vec(reply)?; From d0b5c4c4c11a2420e414b6f1d408ec99e64bf85f Mon Sep 17 00:00:00 2001 From: decentclock Date: Fri, 26 Aug 2022 15:55:43 -0600 Subject: [PATCH 4/4] Check that broker and signer are on the same network --- signer/src/lib.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/signer/src/lib.rs b/signer/src/lib.rs index 59b8653..c638102 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -1,5 +1,6 @@ mod randomstartingtime; +use lightning_signer::bitcoin::blockdata::constants::ChainHash; use lightning_signer::node::NodeServices; use lightning_signer::persist::Persist; use lightning_signer::policy::filter::PolicyFilter; @@ -16,6 +17,7 @@ use vls_protocol::msgs::{self, read_serial_request_header, write_serial_response use vls_protocol::serde_bolt::WireString; use vls_protocol_signer::handler::{Handler, RootHandler}; use vls_protocol_signer::lightning_signer::bitcoin::Network; +use vls_protocol_signer::lightning_signer::wallet::Wallet; pub use sphinx_key_parser::MsgDriver; pub use sphinx_key_persister::FsPersister; @@ -95,6 +97,15 @@ pub fn handle( _ => {} }; + if let Message::HsmdInit(ref m) = message { + if ChainHash::using_genesis_block(root_handler.node.network()).as_bytes() + != &m.chain_params.0 + { + log::error!("The network setting of CLN and VLS don't match!"); + panic!("The network setting of CLN and VLS don't match!"); + } + } + if do_log { log::info!("VLS msg: {:?}", message); }