mirror of
https://github.com/stakwork/sphinx-key.git
synced 2025-12-18 07:44:21 +01:00
Merge pull request #67 from stakwork/http-port-setting
Add http port setting
This commit is contained in:
@@ -98,7 +98,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
|
|||||||
let (status_tx, mut status_rx) = mpsc::channel(1000);
|
let (status_tx, mut status_rx) = mpsc::channel(1000);
|
||||||
let (error_tx, _) = broadcast::channel(1000);
|
let (error_tx, _) = broadcast::channel(1000);
|
||||||
log::info!("=> start broker on network: {}", settings.network);
|
log::info!("=> start broker on network: {}", settings.network);
|
||||||
start_broker(rx, status_tx, error_tx.clone(),CLIENT_ID, &settings).await;
|
start_broker(rx, status_tx, error_tx.clone(),CLIENT_ID, settings).await;
|
||||||
log::info!("=> wait for connected status");
|
log::info!("=> wait for connected status");
|
||||||
// wait for connection = true
|
// wait for connection = true
|
||||||
let status = status_rx.recv().await.expect("couldnt receive");
|
let status = status_rx.recv().await.expect("couldnt receive");
|
||||||
@@ -124,8 +124,8 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
|
|||||||
let mut signer_loop = SignerLoop::new(client, tx.clone());
|
let mut signer_loop = SignerLoop::new(client, tx.clone());
|
||||||
// spawn CLN listener on a std thread
|
// spawn CLN listener on a std thread
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
signer_loop.start(Some(&settings));
|
signer_loop.start(Some(settings));
|
||||||
});
|
});
|
||||||
|
|
||||||
routes::launch_rocket(tx, error_tx)
|
routes::launch_rocket(tx, error_tx, settings)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ pub async fn start_broker(
|
|||||||
status_sender: mpsc::Sender<bool>,
|
status_sender: mpsc::Sender<bool>,
|
||||||
error_sender: broadcast::Sender<Vec<u8>>,
|
error_sender: broadcast::Sender<Vec<u8>>,
|
||||||
expected_client_id: &str,
|
expected_client_id: &str,
|
||||||
settings: &Settings,
|
settings: Settings,
|
||||||
) {
|
) {
|
||||||
let config = config(settings);
|
let config = config(settings);
|
||||||
let client_id = expected_client_id.to_string();
|
let client_id = expected_client_id.to_string();
|
||||||
@@ -144,7 +144,7 @@ fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Opti
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn config(settings: &Settings) -> Config {
|
fn config(settings: Settings) -> Config {
|
||||||
use librumqttd::rumqttlog::Config as RouterConfig;
|
use librumqttd::rumqttlog::Config as RouterConfig;
|
||||||
use librumqttd::{ConnectionSettings, SphinxLoginCredentials, ConsoleSettings, ServerSettings};
|
use librumqttd::{ConnectionSettings, SphinxLoginCredentials, ConsoleSettings, ServerSettings};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -163,7 +163,7 @@ fn config(settings: &Settings) -> Config {
|
|||||||
id.to_string(),
|
id.to_string(),
|
||||||
ServerSettings {
|
ServerSettings {
|
||||||
cert: None,
|
cert: None,
|
||||||
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.port).into(),
|
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.mqtt_port).into(),
|
||||||
next_connection_delay_ms: 1,
|
next_connection_delay_ms: 1,
|
||||||
connections: ConnectionSettings {
|
connections: ConnectionSettings {
|
||||||
connection_timeout_ms: 5000,
|
connection_timeout_ms: 5000,
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use crate::util::Settings;
|
||||||
use crate::ChannelRequest;
|
use crate::ChannelRequest;
|
||||||
use rocket::fairing::{Fairing, Info, Kind};
|
use rocket::fairing::{Fairing, Info, Kind};
|
||||||
use rocket::http::Header;
|
use rocket::http::Header;
|
||||||
@@ -6,6 +7,8 @@ use rocket::response::stream::{EventStream, Event};
|
|||||||
use rocket::tokio::select;
|
use rocket::tokio::select;
|
||||||
use rocket::*;
|
use rocket::*;
|
||||||
use sphinx_key_parser::{topics, error::Error as ParserError};
|
use sphinx_key_parser::{topics, error::Error as ParserError};
|
||||||
|
use std::net::IpAddr::V4;
|
||||||
|
use std::net::Ipv4Addr;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
@@ -43,8 +46,14 @@ async fn errors(error_tx: &State<broadcast::Sender<Vec<u8>>>, mut end: Shutdown)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn launch_rocket(tx: Sender<ChannelRequest>, error_tx: broadcast::Sender<Vec<u8>>) -> Rocket<Build> {
|
pub fn launch_rocket(tx: Sender<ChannelRequest>, error_tx: broadcast::Sender<Vec<u8>>, settings: Settings) -> Rocket<Build> {
|
||||||
|
let config = Config {
|
||||||
|
address: V4(Ipv4Addr::UNSPECIFIED),
|
||||||
|
port: settings.http_port,
|
||||||
|
..Config::debug_default()
|
||||||
|
};
|
||||||
rocket::build()
|
rocket::build()
|
||||||
|
.configure(config)
|
||||||
.mount("/api/", routes![control, errors])
|
.mount("/api/", routes![control, errors])
|
||||||
.attach(CORS)
|
.attach(CORS)
|
||||||
.manage(tx)
|
.manage(tx)
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
|
|||||||
let (tx, rx) = mpsc::channel(1000);
|
let (tx, rx) = mpsc::channel(1000);
|
||||||
let (status_tx, mut status_rx) = mpsc::channel(1000);
|
let (status_tx, mut status_rx) = mpsc::channel(1000);
|
||||||
let (error_tx, _) = broadcast::channel(1000);
|
let (error_tx, _) = broadcast::channel(1000);
|
||||||
start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, &settings).await;
|
start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings).await;
|
||||||
let mut connected = false;
|
let mut connected = false;
|
||||||
let tx_ = tx.clone();
|
let tx_ = tx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -50,7 +50,7 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
launch_rocket(tx, error_tx)
|
launch_rocket(tx, error_tx, settings)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ impl<C: 'static + Client> SignerLoop<C> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Start the read loop
|
/// Start the read loop
|
||||||
pub fn start(&mut self, settings: Option<&Settings>) {
|
pub fn start(&mut self, settings: Option<Settings>) {
|
||||||
info!("loop {}: start", self.log_prefix);
|
info!("loop {}: start", self.log_prefix);
|
||||||
match self.do_loop(settings) {
|
match self.do_loop(settings) {
|
||||||
Ok(()) => info!("loop {}: done", self.log_prefix),
|
Ok(()) => info!("loop {}: done", self.log_prefix),
|
||||||
@@ -70,7 +70,7 @@ impl<C: 'static + Client> SignerLoop<C> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn do_loop(&mut self, settings: Option<&Settings>) -> Result<()> {
|
fn do_loop(&mut self, settings: Option<Settings>) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let raw_msg = self.client.read_raw()?;
|
let raw_msg = self.client.read_raw()?;
|
||||||
debug!("loop {}: got raw", self.log_prefix);
|
debug!("loop {}: got raw", self.log_prefix);
|
||||||
|
|||||||
@@ -5,16 +5,19 @@ use std::fs;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use toml::Value;
|
use toml::Value;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct Settings {
|
pub struct Settings {
|
||||||
|
pub http_port: u16,
|
||||||
|
pub mqtt_port: u16,
|
||||||
pub network: Network,
|
pub network: Network,
|
||||||
pub port: u16,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Settings {
|
impl Default for Settings {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Settings {
|
Settings {
|
||||||
|
http_port: 8000,
|
||||||
|
mqtt_port: 1883,
|
||||||
network: Network::Regtest,
|
network: Network::Regtest,
|
||||||
port: 1883,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -27,8 +30,11 @@ pub fn read_broker_config(config_path: &str) -> Settings {
|
|||||||
if let Some(network) = read_network_setting(&table) {
|
if let Some(network) = read_network_setting(&table) {
|
||||||
settings.network = network;
|
settings.network = network;
|
||||||
}
|
}
|
||||||
if let Some(port) = read_port_setting(&table) {
|
if let Some(mqtt_port) = read_mqtt_port_setting(&table) {
|
||||||
settings.port = port;
|
settings.mqtt_port = mqtt_port;
|
||||||
|
}
|
||||||
|
if let Some(http_port) = read_http_port_setting(&table) {
|
||||||
|
settings.http_port = http_port;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log::info!("File broker.conf not found, using default settings");
|
log::info!("File broker.conf not found, using default settings");
|
||||||
@@ -38,10 +44,17 @@ pub fn read_broker_config(config_path: &str) -> Settings {
|
|||||||
settings.network = net;
|
settings.network = net;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Ok(env_port) = env::var("BROKER_PORT") {
|
if let Ok(env_port) = env::var("BROKER_MQTT_PORT") {
|
||||||
if let Ok(port) = env_port.parse::<u16>() {
|
if let Ok(mqtt_port) = env_port.parse::<u16>() {
|
||||||
if port > 1023 {
|
if mqtt_port > 1023 {
|
||||||
settings.port = port;
|
settings.mqtt_port = mqtt_port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Ok(env_port) = env::var("BROKER_HTTP_PORT") {
|
||||||
|
if let Ok(http_port) = env_port.parse::<u16>() {
|
||||||
|
if http_port > 1023 {
|
||||||
|
settings.http_port = http_port;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -99,22 +112,42 @@ fn read_network_setting(table: &Value) -> Option<Network> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_port_setting(table: &Value) -> Option<u16> {
|
fn read_mqtt_port_setting(table: &Value) -> Option<u16> {
|
||||||
if let None = table.get("port") {
|
if let None = table.get("mqtt_port") {
|
||||||
log::info!("Broker port not specified, setting to default 1883");
|
log::info!("Broker mqtt port not specified, setting to default 1883");
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
let temp = table["port"]
|
let temp = table["mqtt_port"]
|
||||||
.as_integer()
|
.as_integer()
|
||||||
.expect("The port number is not an integer greater than 1023");
|
.expect("The mqtt port number is not an integer greater than 1023");
|
||||||
if temp <= 1023 {
|
if temp <= 1023 {
|
||||||
panic!("The port number is not an integer greater than 1023")
|
panic!("The mqtt port number is not an integer greater than 1023")
|
||||||
}
|
}
|
||||||
let max: i64 = u16::MAX.into();
|
let max: i64 = u16::MAX.into();
|
||||||
if temp > max {
|
if temp > max {
|
||||||
panic!("The port number is way too big!")
|
panic!("The mqtt port number is way too big!")
|
||||||
}
|
}
|
||||||
log::info!("Read broker port setting: {}", temp);
|
log::info!("Read broker mqtt port setting: {}", temp);
|
||||||
|
Some(temp.try_into().unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_http_port_setting(table: &Value) -> Option<u16> {
|
||||||
|
if let None = table.get("http_port") {
|
||||||
|
log::info!("Broker http port not specified, setting to default 8000");
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
let temp = table["http_port"]
|
||||||
|
.as_integer()
|
||||||
|
.expect("The http port number is not an integer greater than 1023");
|
||||||
|
if temp <= 1023 {
|
||||||
|
panic!("The http port number is not an integer greater than 1023")
|
||||||
|
}
|
||||||
|
let max: i64 = u16::MAX.into();
|
||||||
|
if temp > max {
|
||||||
|
panic!("The http port number is way too big!")
|
||||||
|
}
|
||||||
|
log::info!("Read broker http port setting: {}", temp);
|
||||||
Some(temp.try_into().unwrap())
|
Some(temp.try_into().unwrap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user