diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 2f8618d..246b431 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -27,6 +27,7 @@ once_cell = "1.12.0" bitcoin = "0.29.0" async-trait = "0.1" url = { version = "2.2" } +toml = "0.5.9" [features] default = ["std"] diff --git a/broker/config/rumqttd.conf b/broker/config/rumqttd.conf deleted file mode 100644 index 7908ab5..0000000 --- a/broker/config/rumqttd.conf +++ /dev/null @@ -1,28 +0,0 @@ -# Broker id. Used to identify local node of the replication mesh -id = 0 - -# A commitlog read will pull full segment. Make sure that a segment isn't -# too big as async tcp writes readiness of one connection might affect tail -# latencies of other connection. Not a problem with preempting runtimes -[router] -id = 0 -dir = "/tmp/rumqttd" -max_segment_size = 10240 -max_segment_count = 10 -max_connections = 10001 - -# Configuration of server and connections that it accepts -[servers.1] -listen = "0.0.0.0:1883" -next_connection_delay_ms = 1 - [servers.1.connections] - connection_timeout_ms = 5000 - max_client_id_len = 256 - throttle_delay_ms = 0 - max_payload_size = 5120 - max_inflight_count = 200 - max_inflight_size = 1024 - login_credentials = [ { username = "sphinx-key", password = "sphinx-key-pass" } ] - -[console] -listen = "0.0.0.0:3030" \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index 374f7db..d0a6f96 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -9,6 +9,7 @@ mod util; use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; +use crate::util::read_broker_config; use bitcoin::Network; use clap::{App, AppSettings, Arg}; use std::env; @@ -39,6 +40,8 @@ pub struct ChannelReply { pub reply: Vec, } +const BROKER_CONFIG_PATH: &str = "../broker.conf"; + fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); @@ -71,17 +74,13 @@ fn main() -> anyhow::Result<()> { return Ok(()); } - let net_var = env::var("VLS_NETWORK").unwrap_or("regtest".to_string()); - let net_var = match net_var.as_str() { - ret @ ("bitcoin" | "regtest") => ret, - _ => panic!("Please set VLS_NETWORK to either 'bitcoin' or 'regtest'"), - }; - let network = Network::from_str(net_var).unwrap(); + let settings = read_broker_config(BROKER_CONFIG_PATH); + let network = Network::from_str(settings["network"].as_str().unwrap()).unwrap(); let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); log::info!("=> start broker on network: {}", network); - let runtime = start_broker(rx, status_tx, "sphinx-1"); + let runtime = start_broker(rx, status_tx, "sphinx-1", &settings); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.blocking_recv().expect("couldnt receive"); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index e3ac1ca..ab591f7 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -11,6 +11,7 @@ use std::thread; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::timeout; +use toml::Value; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; @@ -31,8 +32,9 @@ pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, + settings: &Value, ) -> tokio::runtime::Runtime { - let config = config(); + let config = config(settings); let client_id = expected_client_id.to_string(); let (mut router, servers, builder) = async_locallink::construct(config.clone()); @@ -146,7 +148,7 @@ fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Opti } } -fn config() -> Config { +fn config(settings: &Value) -> Config { use librumqttd::rumqttlog::Config as RouterConfig; use librumqttd::{ ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, @@ -167,7 +169,11 @@ fn config() -> Config { id.to_string(), ServerSettings { cert: None, - listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), + listen: SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + settings["port"].as_integer().unwrap().try_into().unwrap(), + ) + .into(), next_connection_delay_ms: 1, connections: ConnectionSettings { connection_timeout_ms: 5000, diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 8102071..5070f2b 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -2,6 +2,8 @@ use crate::mqtt::start_broker; use crate::ChannelRequest; use sphinx_key_parser as parser; use tokio::sync::{mpsc, oneshot}; +use toml::map::Map; +use toml::Value; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; @@ -13,9 +15,14 @@ pub fn run_test() { let mut id = 0u16; let mut sequence = 1; + let mut map = Map::new(); + map.insert("port".to_string(), Value::Integer(1883)); + map.insert("network".to_string(), Value::String("regtest".to_string())); + let settings = Value::Table(map); + let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - let runtime = start_broker(rx, status_tx, CLIENT_ID); + let runtime = start_broker(rx, status_tx, CLIENT_ID, &settings); runtime.block_on(async { let mut connected = false; loop { diff --git a/broker/src/util.rs b/broker/src/util.rs index a2ae7d7..1129a33 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -1,5 +1,22 @@ use std::env; +use std::fs; use std::str::FromStr; +use toml::map::Map; +use toml::Value; + +pub fn read_broker_config(config_path: &str) -> Value { + let mut ret = Value::Table(Map::new()); + if let Ok(set) = fs::read_to_string(config_path) { + ret = Value::from_str(&set) + .expect("Couldn't read broker.conf make sure it follows the toml format"); + log::info!("Read broker.conf"); + } else { + log::info!("File broker.conf not found, using default settings"); + } + validate_network_setting(&mut ret); + validate_port_setting(&mut ret); + ret +} pub fn setup_logging(who: &str, level_arg: &str) { use fern::colors::{Color, ColoredLevelConfig}; @@ -32,3 +49,45 @@ pub fn setup_logging(who: &str, level_arg: &str) { .apply() .expect("log config"); } + +fn validate_network_setting(settings: &mut Value) { + if let None = settings.get("network") { + log::info!("Network not specified, setting to default regtest"); + settings + .as_table_mut() + .unwrap() + .insert("network".to_string(), Value::String("regtest".to_string())); + } else { + if !settings["network"].is_str() + || settings["network"].as_str().unwrap() != "bitcoin" + && settings["network"].as_str().unwrap() != "regtest" + { + panic!("The network must be set to either 'bitcoin' or 'regtest'"); + } + log::info!( + "Read network setting: {}", + settings["network"].as_str().unwrap() + ); + } +} + +fn validate_port_setting(settings: &mut Value) { + if let None = settings.get("port") { + log::info!("Broker port not specified, setting to default 1883"); + settings + .as_table_mut() + .unwrap() + .insert("port".to_string(), Value::Integer(1883)); + } else { + let temp = settings["port"] + .as_integer() + .expect("The port number is not an integer greater than 1023"); + if temp <= 1023 { + panic!("The port number is not an integer greater than 1023") + } + if temp > u16::MAX.into() { + panic!("The port number is way too big!") + } + log::info!("Read broker port setting: {}", temp); + } +}