diff --git a/Cargo.toml b/Cargo.toml index 963a443..c6908db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,8 @@ members = [ "signer", "broker", - "parser" + "parser", + "tester" ] exclude = [ diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 2953f35..f789b29 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -9,13 +9,17 @@ default-run = "sphinx-key-broker" rumqttd = "0.11.0" pretty_env_logger = "0.4.0" confy = "0.4.0" -tokio = "1.18" +tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } vls-protocol = { path = "../../validating-lightning-signer/vls-protocol" } +vls-proxy = { path = "../../validating-lightning-signer/vls-proxy" } sphinx-key-parser = { path = "../parser" } secp256k1 = { version = "0.20", features = ["rand-std", "bitcoin_hashes"] } anyhow = {version = "1", features = ["backtrace"]} log = "0.4" +fern = { version = "0.6", features = ["colored"] } rumqttc = "0.12.0" +clap = "=3.0.0-beta.2" +clap_derive = "=3.0.0-beta.5" [features] default = ["std"] diff --git a/broker/src/main.rs b/broker/src/main.rs index e4ebb0b..8c83ac8 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,68 +1,68 @@ -use sphinx_key_parser::MsgDriver; -use librumqttd::{async_locallink::construct_broker, Config}; -use std::thread; -use vls_protocol::msgs; -use vls_protocol::serde_bolt::WireString; -use tokio::sync::mpsc; +mod mqtt; +mod run_test; +mod unix_fd; -const SUB_TOPIC: &str = "sphinx-return"; -const TRIGGER_TOPIC: &str = "trigger"; -const PUB_TOPIC: &str = "sphinx"; +use crate::unix_fd::SignerLoop; +use clap::{App, AppSettings, Arg}; +use crate::mqtt::start_broker; +use std::env; +use tokio::sync::{mpsc, oneshot}; +use vls_proxy::client::UnixClient; +use vls_proxy::connection::{open_parent_fd, UnixConnection}; +use vls_proxy::util::setup_logging; -fn main() { - pretty_env_logger::init(); - let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); - - let (mut router, console, servers, builder) = construct_broker(config); - - thread::spawn(move || { - router.start().unwrap(); - }); - - let mut rt = tokio::runtime::Builder::new_multi_thread(); - rt.enable_all(); - rt.build().unwrap().block_on(async { - let (msg_tx, mut msg_rx): (mpsc::UnboundedSender>, mpsc::UnboundedReceiver>) = mpsc::unbounded_channel(); - let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); - tx.subscribe([SUB_TOPIC, TRIGGER_TOPIC]).await.unwrap(); - - let console_task = tokio::spawn(console); - - let pub_task = tokio::spawn(async move { - while let Some(_) = msg_rx.recv().await { - let sequence = 0; - let mut md = MsgDriver::new_empty(); - msgs::write_serial_request_header(&mut md, sequence, 0).expect("failed to write_serial_request_header"); - let ping = msgs::Ping { - id: 0, - message: WireString("ping".as_bytes().to_vec()), - }; - msgs::write(&mut md, ping).expect("failed to serial write"); - tx.publish(PUB_TOPIC, false, md.bytes()).await.unwrap(); - } - }); - - let sub_task = tokio::spawn(async move { - loop { - let message = rx.recv().await.unwrap(); - // println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); - for payload in message.payload { - if let Err(e) = msg_tx.send(payload.to_vec()) { - println!("pub err {:?}", e); - } - } - } - }); - - servers.await; - println!("server awaited"); - pub_task.await.unwrap(); - println!("pub awaited"); - sub_task.await.unwrap(); - println!("sub awaited"); - console_task.await.unwrap(); - }); +pub struct Channel { + pub sequence: u16, + pub sender: mpsc::Sender, } +/// Responses are received on the oneshot sender +pub struct ChannelRequest { + pub message: Vec, + pub reply_tx: oneshot::Sender, +} +// mpsc reply +pub struct ChannelReply { + pub reply: Vec, +} + +fn main() -> anyhow::Result<()> { + let parent_fd = open_parent_fd(); + + setup_logging("hsmd ", "info"); + let app = App::new("signer") + .setting(AppSettings::NoAutoVersion) + .about("CLN:mqtt - connects to an embedded VLS over a MQTT connection") + .arg( + Arg::new("--dev-disconnect") + .about("ignored dev flag") + .long("dev-disconnect") + .takes_value(true), + ) + .arg(Arg::from("--log-io ignored dev flag")) + .arg(Arg::from("--version show a dummy version")) + .arg(Arg::from("--test run a test against the embedded device")); + let matches = app.get_matches(); + if matches.is_present("version") { + // Pretend to be the right version, given to us by an env var + let version = + env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); + println!("{}", version); + return Ok(()); + } + + if matches.is_present("test") { + run_test::run_test(); + } else { + let (tx, rx) = mpsc::channel(1000); + let _runtime = start_broker(true, rx); + // listen to reqs from CLN + let conn = UnixConnection::new(parent_fd); + let client = UnixClient::new(conn); + let mut signer_loop = SignerLoop::new(client, tx); + signer_loop.start(); + } + + Ok(()) +} diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs new file mode 100644 index 0000000..3762ada --- /dev/null +++ b/broker/src/mqtt.rs @@ -0,0 +1,81 @@ +use crate::{ChannelRequest,ChannelReply}; +use librumqttd::{async_locallink::construct_broker, Config}; +use std::thread; +use tokio::sync::{oneshot, mpsc}; + +const SUB_TOPIC: &str = "sphinx-return"; +const PUB_TOPIC: &str = "sphinx"; + +pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver) -> tokio::runtime::Runtime { + + let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); + + let (mut router, console, servers, builder) = construct_broker(config); + + thread::spawn(move || { + router.start().expect("could not start router"); + }); + + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); + rt_builder.enable_all(); + let rt = rt_builder.build().unwrap(); + rt.block_on(async { + // channel to block until READY received + let (ready_tx, ready_rx) = oneshot::channel(); + tokio::spawn(async move { + let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); + let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); + tx.subscribe([SUB_TOPIC]).await.unwrap(); + + let console_task = tokio::spawn(console); + + let sub_task = tokio::spawn(async move { + // ready message loop + // let ready_tx_ = ready_tx.clone(); + if wait_for_ready_message { + loop { + let message = rx.recv().await.unwrap(); + if let Some(payload) = message.payload.get(0) { + let content = String::from_utf8_lossy(&payload[..]); + if content == "READY" { + ready_tx.send(true).expect("could not send ready"); + break; + } + } + } + } + loop { + let message = rx.recv().await.unwrap(); + // println!("T = {}, P = {:?}", message.topic, message.payload.len()); + // println!("count {}", message.payload.len()); + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()).await { + println!("pub err {:?}", e); + } + } + } + }); + + let relay_task = tokio::spawn(async move { + while let Some(msg) = receiver.recv().await { + tx.publish(PUB_TOPIC, false, msg.message).await.expect("could not mqtt pub"); + let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()"); + if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { + log::warn!("could not send on reply_tx"); + } + } + }); + + servers.await; + sub_task.await.unwrap(); + relay_task.await.unwrap(); + console_task.await.unwrap(); + }); + if wait_for_ready_message { + log::info!("waiting for READY..."); + ready_rx.await.expect("Could not receive from channel."); + } + }); + + rt +} diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs new file mode 100644 index 0000000..712fc7d --- /dev/null +++ b/broker/src/run_test.rs @@ -0,0 +1,66 @@ +use crate::mqtt::start_broker; +use crate::ChannelRequest; +use sphinx_key_parser::MsgDriver; +use tokio::sync::{mpsc, oneshot}; +use vls_protocol::serde_bolt::WireString; +use vls_protocol::{msgs, msgs::Message}; + +pub fn run_test() { + log::info!("TEST..."); + + let mut id = 0u16; + let mut sequence = 1; + + let (tx, rx) = mpsc::channel(1000); + let runtime = start_broker(true, rx); + log::info!("======> READY received! start now"); + runtime.block_on(async { + loop { + if let Err(e) = iteration(id, sequence, tx.clone()).await { + panic!("iteration failed {:?}", e); + } + sequence = sequence.wrapping_add(1); + id += 1; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }); +} + +pub async fn iteration( + id: u16, + sequence: u16, + tx: mpsc::Sender, +) -> anyhow::Result<()> { + let mut md = MsgDriver::new_empty(); + msgs::write_serial_request_header(&mut md, sequence, 0)?; + let ping = msgs::Ping { + id, + message: WireString("ping".as_bytes().to_vec()), + }; + msgs::write(&mut md, ping)?; + let (reply_tx, reply_rx) = oneshot::channel(); + // Send a request to the MQTT handler to send to signer + let request = ChannelRequest { + message: md.bytes(), + reply_tx, + }; + let _ = tx.send(request).await; + let res = reply_rx.await?; + let mut ret = MsgDriver::new(res.reply); + msgs::read_serial_response_header(&mut ret, sequence)?; + let reply = msgs::read(&mut ret)?; + match reply { + Message::Pong(p) => { + log::info!( + "got reply {} {}", + p.id, + String::from_utf8(p.message.0).unwrap() + ); + assert_eq!(p.id, id); + } + _ => { + panic!("unknown response"); + } + } + Ok(()) +} diff --git a/broker/src/test_client.rs b/broker/src/test_client.rs index 9073a64..7bf0f80 100644 --- a/broker/src/test_client.rs +++ b/broker/src/test_client.rs @@ -27,7 +27,6 @@ async fn main() -> Result<(), Box> { // println!("{:?}", event.unwrap()); if let Event::Incoming(packet) = event.unwrap() { if let Packet::Publish(p) = packet { - // println!("incoming {:?}", p.payload); let mut m = MsgDriver::new(p.payload.to_vec()); let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); assert_eq!(dbid, 0); diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs new file mode 100644 index 0000000..37fbf3f --- /dev/null +++ b/broker/src/unix_fd.rs @@ -0,0 +1,137 @@ +use log::*; +use secp256k1::PublicKey; +use sphinx_key_parser::MsgDriver; +use std::thread; +use tokio::sync::{mpsc, oneshot}; +// use tokio::task::spawn_blocking; +use crate::{Channel, ChannelReply, ChannelRequest}; +use vls_protocol::{msgs, msgs::Message, Error, Result}; +use vls_proxy::client::Client; + +#[derive(Clone, Debug)] +pub struct ClientId { + pub peer_id: PublicKey, + pub dbid: u64, +} + +impl Channel { + pub fn new(sender: mpsc::Sender) -> Self { + Self { + sender, + sequence: 0, + } + } +} + +/// Implement the hsmd UNIX fd protocol. +pub struct SignerLoop { + client: C, + log_prefix: String, + chan: Channel, + client_id: Option, +} + +impl SignerLoop { + /// Create a loop for the root (lightningd) connection, but doesn't start it yet + pub fn new(client: C, sender: mpsc::Sender) -> Self { + let log_prefix = format!("{}/{}", std::process::id(), client.id()); + Self { + client, + log_prefix, + chan: Channel::new(sender), + client_id: None, + } + } + + // Create a loop for a non-root connection + fn new_for_client( + client: C, + sender: mpsc::Sender, + client_id: ClientId, + ) -> Self { + let log_prefix = format!("{}/{}", std::process::id(), client.id()); + Self { + client, + log_prefix, + chan: Channel::new(sender), + client_id: Some(client_id), + } + } + + /// Start the read loop + pub fn start(&mut self) { + info!("loop {}: start", self.log_prefix); + match self.do_loop() { + Ok(()) => info!("loop {}: done", self.log_prefix), + Err(Error::Eof) => info!("loop {}: ending", self.log_prefix), + Err(e) => error!("loop {}: error {:?}", self.log_prefix, e), + } + } + + fn do_loop(&mut self) -> Result<()> { + loop { + let raw_msg = self.client.read_raw()?; + debug!("loop {}: got raw", self.log_prefix); + let msg = msgs::from_vec(raw_msg.clone())?; + info!("loop {}: got {:x?}", self.log_prefix, msg); + match msg { + Message::ClientHsmFd(m) => { + self.client.write(msgs::ClientHsmFdReply {}).unwrap(); + let new_client = self.client.new_client(); + info!("new client {} -> {}", self.log_prefix, new_client.id()); + let peer_id = PublicKey::from_slice(&m.peer_id.0).expect("client pubkey"); // we don't expect a bad key from lightningd parent + let client_id = ClientId { + peer_id, + dbid: m.dbid, + }; + let mut new_loop = + SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id); + thread::spawn(move || new_loop.start()); + } + Message::Memleak(_) => { + let reply = msgs::MemleakReply { result: false }; + self.client.write(reply)?; + } + _ => { + let reply = self.handle_message(raw_msg)?; + // Write the reply to the node + self.client.write_vec(reply)?; + info!("replied {}", self.log_prefix); + } + } + } + } + + fn handle_message(&mut self, message: Vec) -> Result> { + let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0); + let mut md = MsgDriver::new_empty(); + msgs::write_serial_request_header(&mut md, self.chan.sequence, dbid)?; + msgs::write_vec(&mut md, message)?; + let reply_rx = self.send_request(md.bytes())?; + let mut res = self.get_reply(reply_rx)?; + msgs::read_serial_response_header(&mut res, self.chan.sequence)?; + self.chan.sequence = self.chan.sequence.wrapping_add(1); + let reply = msgs::read_raw(&mut res)?; + Ok(reply) + } + + fn send_request(&mut self, message: Vec) -> Result> { + // Create a one-shot channel to receive the reply + let (reply_tx, reply_rx) = oneshot::channel(); + // Send a request to the MQTT handler to send to signer + let request = ChannelRequest { message, reply_tx }; + // This can fail if MQTT shuts down + self.chan + .sender + .blocking_send(request) + .map_err(|_| Error::Eof)?; + Ok(reply_rx) + } + + fn get_reply(&mut self, reply_rx: oneshot::Receiver) -> Result> { + // Wait for the signer reply + // Can fail if MQTT shuts down + let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; + Ok(reply.reply) + } +} diff --git a/tester/Cargo.toml b/tester/Cargo.toml new file mode 100644 index 0000000..ed2b329 --- /dev/null +++ b/tester/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "sphinx-key-tester" +version = "0.1.0" +authors = ["Evan Feenstra "] +edition = "2018" + +[dependencies] +sphinx-key-signer = { path = "../signer" } +sphinx-key-parser = { path = "../parser" } +vls-protocol = { path = "../../validating-lightning-signer/vls-protocol" } +vls-protocol-signer = { path = "../../validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["secp-lowmemory", "vls-std"] } +anyhow = {version = "1", features = ["backtrace"]} +log = "0.4" +rumqttc = "0.12.0" +tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } +pretty_env_logger = "0.4.0" diff --git a/tester/src/main.rs b/tester/src/main.rs new file mode 100644 index 0000000..f73e223 --- /dev/null +++ b/tester/src/main.rs @@ -0,0 +1,66 @@ + +use sphinx_key_parser::MsgDriver; + +use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet}; +use std::error::Error; +use std::time::Duration; +use vls_protocol::msgs; + +const SUB_TOPIC: &str = "sphinx"; +const PUB_TOPIC: &str = "sphinx-return"; +const USERNAME: &str = "sphinx-key"; +const PASSWORD: &str = "sphinx-key-pass"; + +#[tokio::main(worker_threads = 1)] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_credentials(USERNAME, PASSWORD); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + + client + .subscribe(SUB_TOPIC, QoS::AtMostOnce) + .await + .expect("could not mqtt subscribe"); + + client.publish(PUB_TOPIC, QoS::AtMostOnce, false, "READY".as_bytes().to_vec()).await.expect("could not pub"); + + loop { + let event = eventloop.poll().await; + // println!("{:?}", event.unwrap()); + if let Some(mut m) = incoming_msg(event.expect("failed to unwrap event")) { + let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); + println!("sequence {}", sequence); + println!("dbid {}", dbid); + let ping: msgs::Ping = + msgs::read_message(&mut m).expect("failed to read ping message"); + println!("INCOMING: {:?}", ping); + let mut md = MsgDriver::new_empty(); + msgs::write_serial_response_header(&mut md, sequence) + .expect("failed to write_serial_request_header"); + let pong = msgs::Pong { + id: ping.id, + message: ping.message + }; + msgs::write(&mut md, pong).expect("failed to serial write"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, md.bytes()) + .await + .expect("could not mqtt publish"); + } + } +} + +fn incoming_msg(event: Event) -> Option { + if let Event::Incoming(packet) = event { + if let Packet::Publish(p) = packet { + let m = MsgDriver::new(p.payload.to_vec()); + return Some(m) + } + } + None +} \ No newline at end of file