diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 2f8618d..246b431 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -27,6 +27,7 @@ once_cell = "1.12.0" bitcoin = "0.29.0" async-trait = "0.1" url = { version = "2.2" } +toml = "0.5.9" [features] default = ["std"] diff --git a/broker/config/rumqttd.conf b/broker/config/rumqttd.conf deleted file mode 100644 index 7908ab5..0000000 --- a/broker/config/rumqttd.conf +++ /dev/null @@ -1,28 +0,0 @@ -# Broker id. Used to identify local node of the replication mesh -id = 0 - -# A commitlog read will pull full segment. Make sure that a segment isn't -# too big as async tcp writes readiness of one connection might affect tail -# latencies of other connection. Not a problem with preempting runtimes -[router] -id = 0 -dir = "/tmp/rumqttd" -max_segment_size = 10240 -max_segment_count = 10 -max_connections = 10001 - -# Configuration of server and connections that it accepts -[servers.1] -listen = "0.0.0.0:1883" -next_connection_delay_ms = 1 - [servers.1.connections] - connection_timeout_ms = 5000 - max_client_id_len = 256 - throttle_delay_ms = 0 - max_payload_size = 5120 - max_inflight_count = 200 - max_inflight_size = 1024 - login_credentials = [ { username = "sphinx-key", password = "sphinx-key-pass" } ] - -[console] -listen = "0.0.0.0:3030" \ No newline at end of file diff --git a/broker/src/init.rs b/broker/src/init.rs deleted file mode 100644 index f03c68b..0000000 --- a/broker/src/init.rs +++ /dev/null @@ -1,71 +0,0 @@ -use crate::ChannelRequest; -use bitcoin::Network; -use sphinx_key_parser as parser; -use sphinx_key_parser::MsgDriver; -use tokio::sync::{mpsc, oneshot}; -use vls_protocol::model::Secret; -use vls_protocol::{msgs, serde_bolt::WireString}; -use vls_proxy::util::{read_allowlist, read_integration_test_seed}; - -pub fn blocking_connect(tx: mpsc::Sender, network: Network) { - let init_msg_2 = crate::init::make_init_msg(network).expect("couldnt make init msg"); - let (reply_tx, reply_rx) = oneshot::channel(); - // Send a request to the MQTT handler to send to signer - let request = ChannelRequest { - message: init_msg_2, - reply_tx, - }; - tx.blocking_send(request).expect("could not blocking send"); - let res = reply_rx.blocking_recv().expect("couldnt receive"); - let reply = parser::response_from_bytes(res.reply, 0).expect("couldnt parse init receive"); - println!("REPLY {:?}", reply); -} - -pub async fn _connect(tx: mpsc::Sender, network: Network) { - let init_msg_2 = crate::init::make_init_msg(network).expect("could make init msg"); - let (reply_tx, reply_rx) = oneshot::channel(); - // Send a request to the MQTT handler to send to signer - let request = ChannelRequest { - message: init_msg_2, - reply_tx, - }; - let _ = tx.send(request).await; - let res = reply_rx.await.expect("couldnt receive"); - let reply = parser::response_from_bytes(res.reply, 0).expect("could parse init receive"); - println!("REPLY {:?}", reply); -} - -pub fn make_init_msg(network: Network) -> anyhow::Result> { - let allowlist = read_allowlist() - .into_iter() - .map(|s| WireString(s.as_bytes().to_vec())) - .collect::>(); - let seed = if network == Network::Bitcoin { - Some(Secret([ - 0x8c, 0xe8, 0x62, 0xab, 0xd5, 0x6b, 0xb4, 0x6a, 0x61, 0x7f, 0xaf, 0x13, 0x50, 0xc1, - 0xca, 0xf5, 0xb1, 0xee, 0x02, 0x97, 0xbf, 0xf3, 0xb8, 0xc9, 0x56, 0x63, 0x58, 0x9f, - 0xec, 0x8c, 0x45, 0x79, - ])) - } else { - read_integration_test_seed() - .map(|s| Secret(s)) - .or(Some(Secret([1; 32]))) - }; - // FIXME remove this - log::info!("allowlist {:?} seed {:?}", allowlist, seed); - let init = msgs::HsmdInit2 { - derivation_style: 0, - network_name: WireString(network.to_string().as_bytes().to_vec()), - dev_seed: seed, - dev_allowlist: allowlist, - }; - let sequence = 0; - let mut md = MsgDriver::new_empty(); - msgs::write_serial_request_header(&mut md, sequence, 0)?; - msgs::write(&mut md, init)?; - Ok(md.bytes()) - // msgs::read_serial_response_header(&mut serial, sequence)?; - // let init_reply: msgs::HsmdInit2Reply = msgs::read_message(&mut serial)?; - // log::info!("init reply {:?}", init_reply); - // Ok(()) -} diff --git a/broker/src/main.rs b/broker/src/main.rs index 374f7db..0242b6c 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,6 +1,5 @@ #![feature(once_cell)] mod chain_tracker; -mod init; mod mqtt; mod run_test; mod unix_fd; @@ -9,10 +8,9 @@ mod util; use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; -use bitcoin::Network; +use crate::util::read_broker_config; use clap::{App, AppSettings, Arg}; use std::env; -use std::str::FromStr; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use url::Url; @@ -39,6 +37,8 @@ pub struct ChannelReply { pub reply: Vec, } +const BROKER_CONFIG_PATH: &str = "../broker.conf"; + fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); @@ -71,17 +71,12 @@ fn main() -> anyhow::Result<()> { return Ok(()); } - let net_var = env::var("VLS_NETWORK").unwrap_or("regtest".to_string()); - let net_var = match net_var.as_str() { - ret @ ("bitcoin" | "regtest") => ret, - _ => panic!("Please set VLS_NETWORK to either 'bitcoin' or 'regtest'"), - }; - let network = Network::from_str(net_var).unwrap(); + let settings = read_broker_config(BROKER_CONFIG_PATH); let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - log::info!("=> start broker on network: {}", network); - let runtime = start_broker(rx, status_tx, "sphinx-1"); + log::info!("=> start broker on network: {}", settings.network); + let runtime = start_broker(rx, status_tx, "sphinx-1", &settings); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.blocking_recv().expect("couldnt receive"); @@ -94,7 +89,7 @@ fn main() -> anyhow::Result<()> { let frontend = Frontend::new( Arc::new(SignerPortFront { signer_port: Box::new(signer_port), - network, + network: settings.network, }), Url::parse(&btc_url).expect("malformed btc rpc url"), ); @@ -107,7 +102,7 @@ fn main() -> anyhow::Result<()> { let client = UnixClient::new(conn); // TODO pass status_rx into SignerLoop let mut signer_loop = SignerLoop::new(client, tx); - signer_loop.start(); + signer_loop.start(Some(&settings)); // }) Ok(()) diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index e3ac1ca..09a7760 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,3 +1,4 @@ +use crate::util::Settings; use crate::{ChannelReply, ChannelRequest}; use librumqttd::{ async_locallink, @@ -31,8 +32,9 @@ pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, + settings: &Settings, ) -> tokio::runtime::Runtime { - let config = config(); + let config = config(settings); let client_id = expected_client_id.to_string(); let (mut router, servers, builder) = async_locallink::construct(config.clone()); @@ -146,7 +148,7 @@ fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Opti } } -fn config() -> Config { +fn config(settings: &Settings) -> Config { use librumqttd::rumqttlog::Config as RouterConfig; use librumqttd::{ ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, @@ -167,7 +169,7 @@ fn config() -> Config { id.to_string(), ServerSettings { cert: None, - listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.port).into(), next_connection_delay_ms: 1, connections: ConnectionSettings { connection_timeout_ms: 5000, diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 8102071..9b354b7 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -1,4 +1,5 @@ use crate::mqtt::start_broker; +use crate::util::Settings; use crate::ChannelRequest; use sphinx_key_parser as parser; use tokio::sync::{mpsc, oneshot}; @@ -13,9 +14,11 @@ pub fn run_test() { let mut id = 0u16; let mut sequence = 1; + let settings = Settings::default(); + let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - let runtime = start_broker(rx, status_tx, CLIENT_ID); + let runtime = start_broker(rx, status_tx, CLIENT_ID, &settings); runtime.block_on(async { let mut connected = false; loop { diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 4161fa5..9adcd8f 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,4 +1,6 @@ +use crate::util::Settings; use crate::{Channel, ChannelReply, ChannelRequest}; +use bitcoin::blockdata::constants::ChainHash; use log::*; use secp256k1::PublicKey; use sphinx_key_parser as parser; @@ -58,16 +60,16 @@ impl SignerLoop { } /// Start the read loop - pub fn start(&mut self) { + pub fn start(&mut self, settings: Option<&Settings>) { info!("loop {}: start", self.log_prefix); - match self.do_loop() { + match self.do_loop(settings) { Ok(()) => info!("loop {}: done", self.log_prefix), Err(Error::Eof) => info!("loop {}: ending", self.log_prefix), Err(e) => error!("loop {}: error {:?}", self.log_prefix, e), } } - fn do_loop(&mut self) -> Result<()> { + fn do_loop(&mut self, settings: Option<&Settings>) -> Result<()> { loop { let raw_msg = self.client.read_raw()?; debug!("loop {}: got raw", self.log_prefix); @@ -85,13 +87,24 @@ impl SignerLoop { }; let mut new_loop = SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id); - thread::spawn(move || new_loop.start()); + thread::spawn(move || new_loop.start(None)); } Message::Memleak(_) => { let reply = msgs::MemleakReply { result: false }; self.client.write(reply)?; } - _ => { + msg => { + if let Message::HsmdInit(m) = msg { + if let Some(set) = settings { + if ChainHash::using_genesis_block(set.network).as_bytes() + != &m.chain_params.0 + { + panic!("The network settings of CLN and broker don't match!"); + } + } else { + panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup"); + } + } let reply = self.handle_message(raw_msg)?; // Write the reply to the node self.client.write_vec(reply)?; diff --git a/broker/src/util.rs b/broker/src/util.rs index a2ae7d7..9fd57e2 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -1,5 +1,41 @@ +use bitcoin::Network; +use std::default::Default; use std::env; +use std::fs; use std::str::FromStr; +use toml::Value; + +pub struct Settings { + pub network: Network, + pub port: u16, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + network: Network::Regtest, + port: 1883, + } + } +} + +pub fn read_broker_config(config_path: &str) -> Settings { + let mut settings = Settings::default(); + if let Ok(set) = fs::read_to_string(config_path) { + let table = Value::from_str(&set) + .expect("Couldn't read broker.conf make sure it follows the toml format"); + log::info!("Read broker.conf"); + if let Some(network) = read_network_setting(&table) { + settings.network = network + } + if let Some(port) = read_port_setting(&table) { + settings.port = port + } + } else { + log::info!("File broker.conf not found, using default settings"); + } + settings +} pub fn setup_logging(who: &str, level_arg: &str) { use fern::colors::{Color, ColoredLevelConfig}; @@ -32,3 +68,41 @@ pub fn setup_logging(who: &str, level_arg: &str) { .apply() .expect("log config"); } + +fn read_network_setting(table: &Value) -> Option { + if let None = table.get("network") { + log::info!("Network not specified, setting to default regtest"); + None + } else { + if !table["network"].is_str() + || table["network"].as_str().unwrap() != "bitcoin" + && table["network"].as_str().unwrap() != "regtest" + { + panic!("The network must be set to either 'bitcoin' or 'regtest'"); + } + log::info!( + "Read network setting: {}", + table["network"].as_str().unwrap() + ); + Some(Network::from_str(table["network"].as_str().unwrap()).unwrap()) + } +} + +fn read_port_setting(table: &Value) -> Option { + if let None = table.get("port") { + log::info!("Broker port not specified, setting to default 1883"); + None + } else { + let temp = table["port"] + .as_integer() + .expect("The port number is not an integer greater than 1023"); + if temp <= 1023 { + panic!("The port number is not an integer greater than 1023") + } + if temp > u16::MAX.into() { + panic!("The port number is way too big!") + } + log::info!("Read broker port setting: {}", temp); + Some(temp.try_into().unwrap()) + } +} diff --git a/signer/src/lib.rs b/signer/src/lib.rs index f0dc809..6e20b96 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -1,6 +1,7 @@ mod derive; mod randomstartingtime; +use lightning_signer::bitcoin::blockdata::constants::ChainHash; use lightning_signer::node::NodeServices; use lightning_signer::persist::Persist; use lightning_signer::policy::filter::PolicyFilter; @@ -15,6 +16,7 @@ use vls_protocol::serde_bolt::WireString; use vls_protocol_signer::handler::{Handler, RootHandler}; pub use vls_protocol_signer::lightning_signer; use vls_protocol_signer::lightning_signer::bitcoin::Network; +use vls_protocol_signer::lightning_signer::wallet::Wallet; pub use vls_protocol_signer::vls_protocol; pub use derive::node_keys as derive_node_keys; @@ -112,6 +114,15 @@ pub fn handle( _ => {} }; + if let Message::HsmdInit(ref m) = message { + if ChainHash::using_genesis_block(root_handler.node.network()).as_bytes() + != &m.chain_params.0 + { + log::error!("The network setting of CLN and VLS don't match!"); + panic!("The network setting of CLN and VLS don't match!"); + } + } + if do_log { log::info!("VLS msg: {:?}", message); } diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 106423e..60d3cd8 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -118,9 +118,9 @@ pub fn make_event_loop( Ok(msg) => { log::info!("CONTROL MSG {:?}", msg); // create a response and mqtt pub here - }, + } Err(e) => log::warn!("error parsing ctrl msg {:?}", e), - }, + }; } } }