diff --git a/Cargo.toml b/Cargo.toml index c6908db..3a1c696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "signer", "broker", "parser", - "tester" + "auther", + "tester", ] exclude = [ diff --git a/auther/Cargo.toml b/auther/Cargo.toml new file mode 100644 index 0000000..6deb5cd --- /dev/null +++ b/auther/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sphinx-key-auther" +version = "0.1.0" +authors = ["Evan Feenstra "] +edition = "2018" + +[dependencies] +secp256k1 = { version = "0.22.0", default-features = false, features = ["std", "rand-std", "bitcoin_hashes"] } +anyhow = {version = "1", features = ["backtrace"]} +log = "0.4" + +[features] +default = [ "no-std", "secp-recovery", "secp-lowmemory" ] +no-std = ["secp256k1/alloc"] +secp-lowmemory = ["secp256k1/lowmemory"] +secp-recovery = ["secp256k1/recovery"] +rand = ["secp256k1/rand-std"] \ No newline at end of file diff --git a/auther/src/lib.rs b/auther/src/lib.rs new file mode 100644 index 0000000..e2c4695 --- /dev/null +++ b/auther/src/lib.rs @@ -0,0 +1,64 @@ +use secp256k1::ecdsa::Signature; +use secp256k1::hashes::sha256d::Hash as Sha256dHash; +use secp256k1::hashes::Hash; +use secp256k1::{Message, Secp256k1, SecretKey}; + +pub struct Token(u64); + +impl Token { + pub fn new() -> Self { + Self(0) + } + /// Sign a Lightning message + pub fn sign_message( + &self, + message: &Vec, + secret_key: &SecretKey, + ) -> anyhow::Result> { + let mut buffer = String::from("Lightning Signed Message:").into_bytes(); + buffer.extend(message); + let secp_ctx = Secp256k1::signing_only(); + let hash = Sha256dHash::hash(&buffer); + let encmsg = secp256k1::Message::from_slice(&hash[..])?; + let sig = secp_ctx.sign_ecdsa_recoverable(&encmsg, &secret_key); + let (rid, sig) = sig.serialize_compact(); + let mut res = sig.to_vec(); + res.push(rid.to_i32() as u8); + Ok(res) + } +} + +pub fn sign( + secp: &Secp256k1, + input: Vec, + secret_key: &SecretKey, +) -> Signature { + let message = hash_message(input); + secp.sign_ecdsa(&message, &secret_key) +} + +pub fn hash_message(input: Vec) -> Message { + let hash = Sha256dHash::hash(&input); + Message::from_slice(&hash[..]).expect("encmsg failed") +} + +#[cfg(test)] +mod tests { + use crate::*; + use secp256k1::{PublicKey, Secp256k1, SecretKey}; + + fn secret_key() -> SecretKey { + SecretKey::from_slice(&[0xcd; 32]).expect("32 bytes, within curve order") + } + + #[test] + fn test_sign() { + let secp = Secp256k1::new(); + let sk = secret_key(); + let public_key = PublicKey::from_secret_key(&secp, &sk); + let input = vec![1, 2, 3]; + let message = hash_message(input); + let sig = sign(&secp, vec![1, 2, 3], &sk); + assert!(secp.verify_ecdsa(&message, &sig, &public_key).is_ok()); + } +} diff --git a/broker/Cargo.toml b/broker/Cargo.toml index deb5652..434467f 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -6,8 +6,7 @@ default-run = "sphinx-key-broker" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# rumqttd = "0.11.0" -rumqttd = { path = "../../rumqtt/rumqttd" } +rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } pretty_env_logger = "0.4.0" confy = "0.4.0" tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } @@ -26,8 +25,3 @@ chrono = "0.4" [features] default = ["std"] std = ["vls-protocol/std"] - -[[bin]] -name = "test_client" -path = "src/test_client.rs" -# ./target/debug/test_client \ No newline at end of file diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 15340bb..fd09938 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,9 +1,11 @@ use crate::{ChannelReply, ChannelRequest}; use librumqttd::{ - async_locallink::construct_broker, + async_locallink, consolelink::{self, ConsoleLink}, + rumqttlog::router::ConnectionMetrics, Config, }; + use std::sync::Arc; use std::thread; use std::time::Duration; @@ -12,16 +14,18 @@ use tokio::time::timeout; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; +const USERNAME: &str = "sphinx-key"; +const PASSWORD: &str = "sphinx-key-pass"; pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, ) -> tokio::runtime::Runtime { - let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); + let config = config(); let client_id = expected_client_id.to_string(); - let (mut router, servers, builder) = construct_broker(config.clone()); + let (mut router, servers, builder) = async_locallink::construct(config.clone()); thread::spawn(move || { router.start().expect("could not start router"); @@ -29,9 +33,6 @@ pub fn start_broker( let mut client_connected = false; - // let (status_tx, mut status_rx): (mpsc::Sender, mpsc::Receiver) = - // mpsc::channel(1000); - let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); rt_builder.enable_all(); let rt = rt_builder.build().unwrap(); @@ -39,8 +40,9 @@ pub fn start_broker( tokio::spawn(async move { let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); - let (mut tx, mut rx) = builder.clone().connect("localclient", 200).await.unwrap(); - tx.subscribe([SUB_TOPIC]).await.unwrap(); + let (mut link_tx, mut link_rx) = + builder.clone().connect("localclient", 200).await.unwrap(); + link_tx.subscribe([SUB_TOPIC]).await.unwrap(); let router_tx = builder.router_tx(); tokio::spawn(async move { @@ -49,40 +51,19 @@ pub fn start_broker( let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); loop { let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); - let changed: Option = match metrics.tracker() { - Some(t) => { - // wait for subscription to be sure - if t.concrete_subscriptions_len() > 0 { - if !client_connected { - Some(true) // changed to true - } else { - None - } - } else { - None - } - } - None => { - if client_connected { - Some(false) - } else { - None - } - } - }; - if let Some(c) = changed { + if let Some(c) = metrics_to_status(metrics, client_connected) { client_connected = c; status_sender .send(c) .await .expect("couldnt send connection status"); } - tokio::time::sleep(Duration::from_millis(800)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } }); let sub_task = tokio::spawn(async move { - while let Ok(message) = rx.recv().await { + while let Ok(message) = link_rx.recv().await { for payload in message.payload { if let Err(e) = msg_tx.send(payload.to_vec()).await { log::warn!("pub err {:?}", e); @@ -93,7 +74,8 @@ pub fn start_broker( let relay_task = tokio::spawn(async move { while let Some(msg) = receiver.recv().await { - tx.publish(PUB_TOPIC, false, msg.message) + link_tx + .publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { @@ -114,3 +96,76 @@ pub fn start_broker( rt } + +fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option { + match metrics.tracker() { + Some(t) => { + // wait for subscription to be sure + if t.concrete_subscriptions_count() > 0 { + if !client_connected { + Some(true) // changed to true + } else { + None + } + } else { + None + } + } + None => { + if client_connected { + Some(false) + } else { + None + } + } + } +} + +fn config() -> Config { + use librumqttd::rumqttlog::Config as RouterConfig; + use librumqttd::{ + ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, + }; + use std::collections::HashMap; + use std::net::{Ipv4Addr, SocketAddrV4}; + use std::path::PathBuf; + let id = 0; + let router = RouterConfig { + id, + dir: PathBuf::from("/tmp/rumqttd"), + max_segment_size: 10240, + max_segment_count: 10, + max_connections: 10001, + }; + let mut servers = HashMap::new(); + servers.insert( + "0".to_string(), + ServerSettings { + cert: None, + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), + next_connection_delay_ms: 1, + connections: ConnectionSettings { + connection_timeout_ms: 5000, + max_client_id_len: 256, + throttle_delay_ms: 0, + max_payload_size: 5120, + max_inflight_count: 200, + max_inflight_size: 1024, + login_credentials: Some(vec![ConnectionLoginCredentials { + username: USERNAME.to_string(), + password: PASSWORD.to_string(), + }]), + }, + }, + ); + Config { + id, + servers, + router, + console: ConsoleSettings { + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030).into(), + }, + cluster: None, + replicator: None, + } +} diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 952cc8f..2039f72 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -5,6 +5,8 @@ use tokio::sync::{mpsc, oneshot}; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; +const CLIENT_ID: &str = "test-1"; + pub fn run_test() { log::info!("TEST..."); @@ -13,7 +15,7 @@ pub fn run_test() { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - let runtime = start_broker(rx, status_tx, "test-1"); + let runtime = start_broker(rx, status_tx, CLIENT_ID); runtime.block_on(async { let mut connected = false; loop { @@ -23,7 +25,7 @@ pub fn run_test() { connected = connection_status; id = 0; sequence = 1; - log::info!("========> CONNETED! {}", connection_status); + log::info!("========> CONNECTED! {}", connection_status); } } res = iteration(id, sequence, tx.clone(), connected) => { diff --git a/broker/src/test_client.rs b/broker/src/test_client.rs deleted file mode 100644 index 7bf0f80..0000000 --- a/broker/src/test_client.rs +++ /dev/null @@ -1,59 +0,0 @@ - -use sphinx_key_parser::MsgDriver; - -use tokio::{task, time}; -use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet}; -use std::error::Error; -use std::time::Duration; -use vls_protocol::msgs; - -#[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_credentials("sphinx-key", "sphinx-key-pass"); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - time::sleep(Duration::from_secs(3)).await; - }); - - loop { - let event = eventloop.poll().await; - // println!("{:?}", event.unwrap()); - if let Event::Incoming(packet) = event.unwrap() { - if let Packet::Publish(p) = packet { - let mut m = MsgDriver::new(p.payload.to_vec()); - let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); - assert_eq!(dbid, 0); - assert_eq!(sequence, 0); - let ping: msgs::Ping = - msgs::read_message(&mut m).expect("failed to read ping message"); - println!("INCOMING: {:?}", ping); - } - } - } -} - -async fn requests(client: AsyncClient) { - - client - .subscribe("sphinx", QoS::AtMostOnce) - .await - .unwrap(); - - for _ in 1..=10 { - client - .publish("trigger", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap(); - - time::sleep(Duration::from_secs(1)).await; - } - - time::sleep(Duration::from_secs(120)).await; -} \ No newline at end of file diff --git a/tester/src/main.rs b/tester/src/main.rs index df9fe5b..7e007e6 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -20,97 +20,116 @@ async fn main() -> Result<(), Box> { .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt-tester - MQTT client signer") .arg(Arg::from("--test run a test against the embedded device")); - - let mut try_i = 0; - let (client, mut eventloop) = loop { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_credentials(USERNAME, PASSWORD); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - match eventloop.poll().await { - Ok(event) => { - if let Some(_) = incoming_conn_ack(event) { - println!("==========> MQTT connected!"); - break (client, eventloop); + let matches = app.get_matches(); + let is_test = matches.is_present("test"); + // main loop - alternate between "reconnection" and "handler" + loop { + let mut try_i = 0; + let (client, mut eventloop) = loop { + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_credentials(USERNAME, PASSWORD); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + match eventloop.poll().await { + Ok(event) => { + if let Some(_) = incoming_conn_ack(event) { + println!("==========> MQTT connected!"); + break (client, eventloop); + } + } + Err(_) => { + try_i = try_i + 1; + println!("reconnect.... {}", try_i); + tokio::time::sleep(Duration::from_secs(1)).await; } } - Err(_) => { - try_i = try_i + 1; - println!("reconnect.... {}", try_i); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }; - - client - .subscribe(SUB_TOPIC, QoS::AtMostOnce) - .await - .expect("could not mqtt subscribe"); - - // client - // .publish( - // PUB_TOPIC, - // QoS::AtMostOnce, - // false, - // "READY".as_bytes().to_vec(), - // ) - // .await - // .expect("could not pub"); - - let matches = app.get_matches(); - if matches.is_present("test") { - loop { - let event = eventloop.poll().await.expect("failed to unwrap event"); - // println!("{:?}", event); - if let Some(ping_bytes) = incoming_bytes(event) { - let (ping, sequence, dbid): (msgs::Ping, u16, u64) = - parser::request_from_bytes(ping_bytes).expect("read ping header"); - println!("sequence {}", sequence); - println!("dbid {}", dbid); - println!("INCOMING: {:?}", ping); - let pong = msgs::Pong { - id: ping.id, - message: ping.message, - }; - let bytes = parser::raw_response_from_msg(pong, sequence) - .expect("couldnt parse raw response"); - client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) - .await - .expect("could not mqtt publish"); - } - } - } else { - // once the init loop is done, the root_handler is returned - let root_handler = loop { - let init_event = eventloop.poll().await.expect("failed to unwrap event"); - // this may be another kind of message like MQTT ConnAck - // loop around again and wait for the init - if let Some(init_msg_bytes) = incoming_bytes(init_event) { - let InitResponse { - root_handler, - init_reply, - } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); - client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply) - .await - .expect("could not publish init response"); - // return the root_handler and finish the init loop - break root_handler; - } }; - // the actual loop - loop { - let event = eventloop.poll().await.expect("failed to unwrap event"); - let dummy_peer = PubKey([0; 33]); - if let Some(msg_bytes) = incoming_bytes(event) { - match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) { - Ok(b) => client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) - .await - .expect("could not publish init response"), - Err(e) => panic!("HANDLE FAILED {:?}", e), - }; + + client + .subscribe(SUB_TOPIC, QoS::AtMostOnce) + .await + .expect("could not mqtt subscribe"); + + if is_test { + // test handler loop + loop { + match eventloop.poll().await { + Ok(event) => { + // println!("{:?}", event); + if let Some(ping_bytes) = incoming_bytes(event) { + let (ping, sequence, dbid): (msgs::Ping, u16, u64) = + parser::request_from_bytes(ping_bytes).expect("read ping header"); + println!("sequence {}", sequence); + println!("dbid {}", dbid); + println!("INCOMING: {:?}", ping); + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + let bytes = parser::raw_response_from_msg(pong, sequence) + .expect("couldnt parse raw response"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) + .await + .expect("could not mqtt publish"); + } + } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } + } else { + // once the init loop is done, the root_handler is returned + let root_handler = loop { + if let Ok(init_event) = eventloop.poll().await { + // this may be another kind of message like MQTT ConnAck + // loop around again and wait for the init + if let Some(init_msg_bytes) = incoming_bytes(init_event) { + let InitResponse { + root_handler, + init_reply, + } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply) + .await + .expect("could not publish init response"); + // return the root_handler and finish the init loop + break Some(root_handler); + } + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + log::warn!("failed to initialize! Lost connection"); + break None; + } + }; + // the actual handler loop + loop { + if let Some(rh) = &root_handler { + match eventloop.poll().await { + Ok(event) => { + let dummy_peer = PubKey([0; 33]); + if let Some(msg_bytes) = incoming_bytes(event) { + match sphinx_key_signer::handle(rh, msg_bytes, dummy_peer.clone()) { + Ok(b) => client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) + .await + .expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; + } + } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } else { + break; + } } } }