Merge pull request #57 from stakwork/broker-config-file

Add broker config file with network and port settings
This commit is contained in:
decentclock
2022-08-31 16:07:17 -04:00
committed by GitHub
8 changed files with 121 additions and 49 deletions

View File

@@ -27,6 +27,7 @@ once_cell = "1.12.0"
bitcoin = "0.29.0"
async-trait = "0.1"
url = { version = "2.2" }
toml = "0.5.9"
[features]
default = ["std"]

View File

@@ -1,28 +0,0 @@
# Broker id. Used to identify local node of the replication mesh
id = 0
# A commitlog read will pull full segment. Make sure that a segment isn't
# too big as async tcp writes readiness of one connection might affect tail
# latencies of other connection. Not a problem with preempting runtimes
[router]
id = 0
dir = "/tmp/rumqttd"
max_segment_size = 10240
max_segment_count = 10
max_connections = 10001
# Configuration of server and connections that it accepts
[servers.1]
listen = "0.0.0.0:1883"
next_connection_delay_ms = 1
[servers.1.connections]
connection_timeout_ms = 5000
max_client_id_len = 256
throttle_delay_ms = 0
max_payload_size = 5120
max_inflight_count = 200
max_inflight_size = 1024
login_credentials = [ { username = "sphinx-key", password = "sphinx-key-pass" } ]
[console]
listen = "0.0.0.0:3030"

View File

@@ -9,10 +9,9 @@ mod util;
use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::start_broker;
use crate::unix_fd::SignerLoop;
use bitcoin::Network;
use crate::util::read_broker_config;
use clap::{App, AppSettings, Arg};
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use url::Url;
@@ -39,6 +38,8 @@ pub struct ChannelReply {
pub reply: Vec<u8>,
}
const BROKER_CONFIG_PATH: &str = "../broker.conf";
fn main() -> anyhow::Result<()> {
let parent_fd = open_parent_fd();
@@ -71,17 +72,12 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
let net_var = env::var("VLS_NETWORK").unwrap_or("regtest".to_string());
let net_var = match net_var.as_str() {
ret @ ("bitcoin" | "regtest") => ret,
_ => panic!("Please set VLS_NETWORK to either 'bitcoin' or 'regtest'"),
};
let network = Network::from_str(net_var).unwrap();
let settings = read_broker_config(BROKER_CONFIG_PATH);
let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000);
log::info!("=> start broker on network: {}", network);
let runtime = start_broker(rx, status_tx, "sphinx-1");
log::info!("=> start broker on network: {}", settings.network);
let runtime = start_broker(rx, status_tx, "sphinx-1", &settings);
log::info!("=> wait for connected status");
// wait for connection = true
let status = status_rx.blocking_recv().expect("couldnt receive");
@@ -94,7 +90,7 @@ fn main() -> anyhow::Result<()> {
let frontend = Frontend::new(
Arc::new(SignerPortFront {
signer_port: Box::new(signer_port),
network,
network: settings.network,
}),
Url::parse(&btc_url).expect("malformed btc rpc url"),
);
@@ -107,7 +103,7 @@ fn main() -> anyhow::Result<()> {
let client = UnixClient::new(conn);
// TODO pass status_rx into SignerLoop
let mut signer_loop = SignerLoop::new(client, tx);
signer_loop.start();
signer_loop.start(Some(&settings));
// })
Ok(())

View File

@@ -1,3 +1,4 @@
use crate::util::Settings;
use crate::{ChannelReply, ChannelRequest};
use librumqttd::{
async_locallink,
@@ -31,8 +32,9 @@ pub fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>,
expected_client_id: &str,
settings: &Settings,
) -> tokio::runtime::Runtime {
let config = config();
let config = config(settings);
let client_id = expected_client_id.to_string();
let (mut router, servers, builder) = async_locallink::construct(config.clone());
@@ -146,7 +148,7 @@ fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Opti
}
}
fn config() -> Config {
fn config(settings: &Settings) -> Config {
use librumqttd::rumqttlog::Config as RouterConfig;
use librumqttd::{
ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings,
@@ -167,7 +169,7 @@ fn config() -> Config {
id.to_string(),
ServerSettings {
cert: None,
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(),
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.port).into(),
next_connection_delay_ms: 1,
connections: ConnectionSettings {
connection_timeout_ms: 5000,

View File

@@ -1,4 +1,5 @@
use crate::mqtt::start_broker;
use crate::util::Settings;
use crate::ChannelRequest;
use sphinx_key_parser as parser;
use tokio::sync::{mpsc, oneshot};
@@ -13,9 +14,11 @@ pub fn run_test() {
let mut id = 0u16;
let mut sequence = 1;
let settings = Settings::default();
let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000);
let runtime = start_broker(rx, status_tx, CLIENT_ID);
let runtime = start_broker(rx, status_tx, CLIENT_ID, &settings);
runtime.block_on(async {
let mut connected = false;
loop {

View File

@@ -1,4 +1,6 @@
use crate::util::Settings;
use crate::{Channel, ChannelReply, ChannelRequest};
use bitcoin::blockdata::constants::ChainHash;
use log::*;
use secp256k1::PublicKey;
use sphinx_key_parser as parser;
@@ -58,16 +60,16 @@ impl<C: 'static + Client> SignerLoop<C> {
}
/// Start the read loop
pub fn start(&mut self) {
pub fn start(&mut self, settings: Option<&Settings>) {
info!("loop {}: start", self.log_prefix);
match self.do_loop() {
match self.do_loop(settings) {
Ok(()) => info!("loop {}: done", self.log_prefix),
Err(Error::Eof) => info!("loop {}: ending", self.log_prefix),
Err(e) => error!("loop {}: error {:?}", self.log_prefix, e),
}
}
fn do_loop(&mut self) -> Result<()> {
fn do_loop(&mut self, settings: Option<&Settings>) -> Result<()> {
loop {
let raw_msg = self.client.read_raw()?;
debug!("loop {}: got raw", self.log_prefix);
@@ -85,13 +87,24 @@ impl<C: 'static + Client> SignerLoop<C> {
};
let mut new_loop =
SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id);
thread::spawn(move || new_loop.start());
thread::spawn(move || new_loop.start(None));
}
Message::Memleak(_) => {
let reply = msgs::MemleakReply { result: false };
self.client.write(reply)?;
}
_ => {
msg => {
if let Message::HsmdInit(m) = msg {
if let Some(set) = settings {
if ChainHash::using_genesis_block(set.network).as_bytes()
!= &m.chain_params.0
{
panic!("The network settings of CLN and broker don't match!");
}
} else {
panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup");
}
}
let reply = self.handle_message(raw_msg)?;
// Write the reply to the node
self.client.write_vec(reply)?;

View File

@@ -1,5 +1,41 @@
use bitcoin::Network;
use std::default::Default;
use std::env;
use std::fs;
use std::str::FromStr;
use toml::Value;
pub struct Settings {
pub network: Network,
pub port: u16,
}
impl Default for Settings {
fn default() -> Self {
Settings {
network: Network::Regtest,
port: 1883,
}
}
}
pub fn read_broker_config(config_path: &str) -> Settings {
let mut settings = Settings::default();
if let Ok(set) = fs::read_to_string(config_path) {
let table = Value::from_str(&set)
.expect("Couldn't read broker.conf make sure it follows the toml format");
log::info!("Read broker.conf");
if let Some(network) = read_network_setting(&table) {
settings.network = network
}
if let Some(port) = read_port_setting(&table) {
settings.port = port
}
} else {
log::info!("File broker.conf not found, using default settings");
}
settings
}
pub fn setup_logging(who: &str, level_arg: &str) {
use fern::colors::{Color, ColoredLevelConfig};
@@ -32,3 +68,41 @@ pub fn setup_logging(who: &str, level_arg: &str) {
.apply()
.expect("log config");
}
fn read_network_setting(table: &Value) -> Option<Network> {
if let None = table.get("network") {
log::info!("Network not specified, setting to default regtest");
None
} else {
if !table["network"].is_str()
|| table["network"].as_str().unwrap() != "bitcoin"
&& table["network"].as_str().unwrap() != "regtest"
{
panic!("The network must be set to either 'bitcoin' or 'regtest'");
}
log::info!(
"Read network setting: {}",
table["network"].as_str().unwrap()
);
Some(Network::from_str(table["network"].as_str().unwrap()).unwrap())
}
}
fn read_port_setting(table: &Value) -> Option<u16> {
if let None = table.get("port") {
log::info!("Broker port not specified, setting to default 1883");
None
} else {
let temp = table["port"]
.as_integer()
.expect("The port number is not an integer greater than 1023");
if temp <= 1023 {
panic!("The port number is not an integer greater than 1023")
}
if temp > u16::MAX.into() {
panic!("The port number is way too big!")
}
log::info!("Read broker port setting: {}", temp);
Some(temp.try_into().unwrap())
}
}

View File

@@ -1,5 +1,6 @@
mod randomstartingtime;
use lightning_signer::bitcoin::blockdata::constants::ChainHash;
use lightning_signer::node::NodeServices;
use lightning_signer::persist::Persist;
use lightning_signer::policy::filter::PolicyFilter;
@@ -16,6 +17,7 @@ use vls_protocol::msgs::{self, read_serial_request_header, write_serial_response
use vls_protocol::serde_bolt::WireString;
use vls_protocol_signer::handler::{Handler, RootHandler};
use vls_protocol_signer::lightning_signer::bitcoin::Network;
use vls_protocol_signer::lightning_signer::wallet::Wallet;
pub use sphinx_key_parser::MsgDriver;
pub use sphinx_key_persister::FsPersister;
@@ -95,6 +97,15 @@ pub fn handle(
_ => {}
};
if let Message::HsmdInit(ref m) = message {
if ChainHash::using_genesis_block(root_handler.node.network()).as_bytes()
!= &m.chain_params.0
{
log::error!("The network setting of CLN and VLS don't match!");
panic!("The network setting of CLN and VLS don't match!");
}
}
if do_log {
log::info!("VLS msg: {:?}", message);
}