broker VLS unix connection, real ping test, clap cli utils, virtual VLS test client, mqtt relay channels

This commit is contained in:
Evan Feenstra
2022-06-03 12:46:19 -07:00
parent 7f47f2b525
commit d19bb5adc6
9 changed files with 436 additions and 66 deletions

View File

@@ -3,7 +3,8 @@
members = [
"signer",
"broker",
"parser"
"parser",
"tester"
]
exclude = [

View File

@@ -9,13 +9,17 @@ default-run = "sphinx-key-broker"
rumqttd = "0.11.0"
pretty_env_logger = "0.4.0"
confy = "0.4.0"
tokio = "1.18"
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }
vls-protocol = { path = "../../validating-lightning-signer/vls-protocol" }
vls-proxy = { path = "../../validating-lightning-signer/vls-proxy" }
sphinx-key-parser = { path = "../parser" }
secp256k1 = { version = "0.20", features = ["rand-std", "bitcoin_hashes"] }
anyhow = {version = "1", features = ["backtrace"]}
log = "0.4"
fern = { version = "0.6", features = ["colored"] }
rumqttc = "0.12.0"
clap = "=3.0.0-beta.2"
clap_derive = "=3.0.0-beta.5"
[features]
default = ["std"]

View File

@@ -1,68 +1,68 @@
use sphinx_key_parser::MsgDriver;
use librumqttd::{async_locallink::construct_broker, Config};
use std::thread;
use vls_protocol::msgs;
use vls_protocol::serde_bolt::WireString;
use tokio::sync::mpsc;
mod mqtt;
mod run_test;
mod unix_fd;
const SUB_TOPIC: &str = "sphinx-return";
const TRIGGER_TOPIC: &str = "trigger";
const PUB_TOPIC: &str = "sphinx";
use crate::unix_fd::SignerLoop;
use clap::{App, AppSettings, Arg};
use crate::mqtt::start_broker;
use std::env;
use tokio::sync::{mpsc, oneshot};
use vls_proxy::client::UnixClient;
use vls_proxy::connection::{open_parent_fd, UnixConnection};
use vls_proxy::util::setup_logging;
fn main() {
pretty_env_logger::init();
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
let (mut router, console, servers, builder) = construct_broker(config);
thread::spawn(move || {
router.start().unwrap();
});
let mut rt = tokio::runtime::Builder::new_multi_thread();
rt.enable_all();
rt.build().unwrap().block_on(async {
let (msg_tx, mut msg_rx): (mpsc::UnboundedSender<Vec<u8>>, mpsc::UnboundedReceiver<Vec<u8>>) = mpsc::unbounded_channel();
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
tx.subscribe([SUB_TOPIC, TRIGGER_TOPIC]).await.unwrap();
let console_task = tokio::spawn(console);
let pub_task = tokio::spawn(async move {
while let Some(_) = msg_rx.recv().await {
let sequence = 0;
let mut md = MsgDriver::new_empty();
msgs::write_serial_request_header(&mut md, sequence, 0).expect("failed to write_serial_request_header");
let ping = msgs::Ping {
id: 0,
message: WireString("ping".as_bytes().to_vec()),
};
msgs::write(&mut md, ping).expect("failed to serial write");
tx.publish(PUB_TOPIC, false, md.bytes()).await.unwrap();
}
});
let sub_task = tokio::spawn(async move {
loop {
let message = rx.recv().await.unwrap();
// println!("T = {}, P = {:?}", message.topic, message.payload.len());
// println!("count {}", message.payload.len());
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()) {
println!("pub err {:?}", e);
}
}
}
});
servers.await;
println!("server awaited");
pub_task.await.unwrap();
println!("pub awaited");
sub_task.await.unwrap();
println!("sub awaited");
console_task.await.unwrap();
});
pub struct Channel {
pub sequence: u16,
pub sender: mpsc::Sender<ChannelRequest>,
}
/// Responses are received on the oneshot sender
pub struct ChannelRequest {
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>,
}
// mpsc reply
pub struct ChannelReply {
pub reply: Vec<u8>,
}
fn main() -> anyhow::Result<()> {
let parent_fd = open_parent_fd();
setup_logging("hsmd ", "info");
let app = App::new("signer")
.setting(AppSettings::NoAutoVersion)
.about("CLN:mqtt - connects to an embedded VLS over a MQTT connection")
.arg(
Arg::new("--dev-disconnect")
.about("ignored dev flag")
.long("dev-disconnect")
.takes_value(true),
)
.arg(Arg::from("--log-io ignored dev flag"))
.arg(Arg::from("--version show a dummy version"))
.arg(Arg::from("--test run a test against the embedded device"));
let matches = app.get_matches();
if matches.is_present("version") {
// Pretend to be the right version, given to us by an env var
let version =
env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning");
println!("{}", version);
return Ok(());
}
if matches.is_present("test") {
run_test::run_test();
} else {
let (tx, rx) = mpsc::channel(1000);
let _runtime = start_broker(true, rx);
// listen to reqs from CLN
let conn = UnixConnection::new(parent_fd);
let client = UnixClient::new(conn);
let mut signer_loop = SignerLoop::new(client, tx);
signer_loop.start();
}
Ok(())
}

81
broker/src/mqtt.rs Normal file
View File

@@ -0,0 +1,81 @@
use crate::{ChannelRequest,ChannelReply};
use librumqttd::{async_locallink::construct_broker, Config};
use std::thread;
use tokio::sync::{oneshot, mpsc};
const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx";
pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<ChannelRequest>) -> tokio::runtime::Runtime {
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
let (mut router, console, servers, builder) = construct_broker(config);
thread::spawn(move || {
router.start().expect("could not start router");
});
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
rt_builder.enable_all();
let rt = rt_builder.build().unwrap();
rt.block_on(async {
// channel to block until READY received
let (ready_tx, ready_rx) = oneshot::channel();
tokio::spawn(async move {
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1000);
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
tx.subscribe([SUB_TOPIC]).await.unwrap();
let console_task = tokio::spawn(console);
let sub_task = tokio::spawn(async move {
// ready message loop
// let ready_tx_ = ready_tx.clone();
if wait_for_ready_message {
loop {
let message = rx.recv().await.unwrap();
if let Some(payload) = message.payload.get(0) {
let content = String::from_utf8_lossy(&payload[..]);
if content == "READY" {
ready_tx.send(true).expect("could not send ready");
break;
}
}
}
}
loop {
let message = rx.recv().await.unwrap();
// println!("T = {}, P = {:?}", message.topic, message.payload.len());
// println!("count {}", message.payload.len());
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
println!("pub err {:?}", e);
}
}
}
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
tx.publish(PUB_TOPIC, false, msg.message).await.expect("could not mqtt pub");
let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()");
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx");
}
}
});
servers.await;
sub_task.await.unwrap();
relay_task.await.unwrap();
console_task.await.unwrap();
});
if wait_for_ready_message {
log::info!("waiting for READY...");
ready_rx.await.expect("Could not receive from channel.");
}
});
rt
}

66
broker/src/run_test.rs Normal file
View File

@@ -0,0 +1,66 @@
use crate::mqtt::start_broker;
use crate::ChannelRequest;
use sphinx_key_parser::MsgDriver;
use tokio::sync::{mpsc, oneshot};
use vls_protocol::serde_bolt::WireString;
use vls_protocol::{msgs, msgs::Message};
pub fn run_test() {
log::info!("TEST...");
let mut id = 0u16;
let mut sequence = 1;
let (tx, rx) = mpsc::channel(1000);
let runtime = start_broker(true, rx);
log::info!("======> READY received! start now");
runtime.block_on(async {
loop {
if let Err(e) = iteration(id, sequence, tx.clone()).await {
panic!("iteration failed {:?}", e);
}
sequence = sequence.wrapping_add(1);
id += 1;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
}
pub async fn iteration(
id: u16,
sequence: u16,
tx: mpsc::Sender<ChannelRequest>,
) -> anyhow::Result<()> {
let mut md = MsgDriver::new_empty();
msgs::write_serial_request_header(&mut md, sequence, 0)?;
let ping = msgs::Ping {
id,
message: WireString("ping".as_bytes().to_vec()),
};
msgs::write(&mut md, ping)?;
let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer
let request = ChannelRequest {
message: md.bytes(),
reply_tx,
};
let _ = tx.send(request).await;
let res = reply_rx.await?;
let mut ret = MsgDriver::new(res.reply);
msgs::read_serial_response_header(&mut ret, sequence)?;
let reply = msgs::read(&mut ret)?;
match reply {
Message::Pong(p) => {
log::info!(
"got reply {} {}",
p.id,
String::from_utf8(p.message.0).unwrap()
);
assert_eq!(p.id, id);
}
_ => {
panic!("unknown response");
}
}
Ok(())
}

View File

@@ -27,7 +27,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
// println!("{:?}", event.unwrap());
if let Event::Incoming(packet) = event.unwrap() {
if let Packet::Publish(p) = packet {
// println!("incoming {:?}", p.payload);
let mut m = MsgDriver::new(p.payload.to_vec());
let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header");
assert_eq!(dbid, 0);

137
broker/src/unix_fd.rs Normal file
View File

@@ -0,0 +1,137 @@
use log::*;
use secp256k1::PublicKey;
use sphinx_key_parser::MsgDriver;
use std::thread;
use tokio::sync::{mpsc, oneshot};
// use tokio::task::spawn_blocking;
use crate::{Channel, ChannelReply, ChannelRequest};
use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client;
#[derive(Clone, Debug)]
pub struct ClientId {
pub peer_id: PublicKey,
pub dbid: u64,
}
impl Channel {
pub fn new(sender: mpsc::Sender<ChannelRequest>) -> Self {
Self {
sender,
sequence: 0,
}
}
}
/// Implement the hsmd UNIX fd protocol.
pub struct SignerLoop<C: 'static + Client> {
client: C,
log_prefix: String,
chan: Channel,
client_id: Option<ClientId>,
}
impl<C: 'static + Client> SignerLoop<C> {
/// Create a loop for the root (lightningd) connection, but doesn't start it yet
pub fn new(client: C, sender: mpsc::Sender<ChannelRequest>) -> Self {
let log_prefix = format!("{}/{}", std::process::id(), client.id());
Self {
client,
log_prefix,
chan: Channel::new(sender),
client_id: None,
}
}
// Create a loop for a non-root connection
fn new_for_client(
client: C,
sender: mpsc::Sender<ChannelRequest>,
client_id: ClientId,
) -> Self {
let log_prefix = format!("{}/{}", std::process::id(), client.id());
Self {
client,
log_prefix,
chan: Channel::new(sender),
client_id: Some(client_id),
}
}
/// Start the read loop
pub fn start(&mut self) {
info!("loop {}: start", self.log_prefix);
match self.do_loop() {
Ok(()) => info!("loop {}: done", self.log_prefix),
Err(Error::Eof) => info!("loop {}: ending", self.log_prefix),
Err(e) => error!("loop {}: error {:?}", self.log_prefix, e),
}
}
fn do_loop(&mut self) -> Result<()> {
loop {
let raw_msg = self.client.read_raw()?;
debug!("loop {}: got raw", self.log_prefix);
let msg = msgs::from_vec(raw_msg.clone())?;
info!("loop {}: got {:x?}", self.log_prefix, msg);
match msg {
Message::ClientHsmFd(m) => {
self.client.write(msgs::ClientHsmFdReply {}).unwrap();
let new_client = self.client.new_client();
info!("new client {} -> {}", self.log_prefix, new_client.id());
let peer_id = PublicKey::from_slice(&m.peer_id.0).expect("client pubkey"); // we don't expect a bad key from lightningd parent
let client_id = ClientId {
peer_id,
dbid: m.dbid,
};
let mut new_loop =
SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id);
thread::spawn(move || new_loop.start());
}
Message::Memleak(_) => {
let reply = msgs::MemleakReply { result: false };
self.client.write(reply)?;
}
_ => {
let reply = self.handle_message(raw_msg)?;
// Write the reply to the node
self.client.write_vec(reply)?;
info!("replied {}", self.log_prefix);
}
}
}
}
fn handle_message(&mut self, message: Vec<u8>) -> Result<Vec<u8>> {
let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0);
let mut md = MsgDriver::new_empty();
msgs::write_serial_request_header(&mut md, self.chan.sequence, dbid)?;
msgs::write_vec(&mut md, message)?;
let reply_rx = self.send_request(md.bytes())?;
let mut res = self.get_reply(reply_rx)?;
msgs::read_serial_response_header(&mut res, self.chan.sequence)?;
self.chan.sequence = self.chan.sequence.wrapping_add(1);
let reply = msgs::read_raw(&mut res)?;
Ok(reply)
}
fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
// Create a one-shot channel to receive the reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer
let request = ChannelRequest { message, reply_tx };
// This can fail if MQTT shuts down
self.chan
.sender
.blocking_send(request)
.map_err(|_| Error::Eof)?;
Ok(reply_rx)
}
fn get_reply(&mut self, reply_rx: oneshot::Receiver<ChannelReply>) -> Result<Vec<u8>> {
// Wait for the signer reply
// Can fail if MQTT shuts down
let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok(reply.reply)
}
}

16
tester/Cargo.toml Normal file
View File

@@ -0,0 +1,16 @@
[package]
name = "sphinx-key-tester"
version = "0.1.0"
authors = ["Evan Feenstra <evanfeenstra@gmail.com>"]
edition = "2018"
[dependencies]
sphinx-key-signer = { path = "../signer" }
sphinx-key-parser = { path = "../parser" }
vls-protocol = { path = "../../validating-lightning-signer/vls-protocol" }
vls-protocol-signer = { path = "../../validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["secp-lowmemory", "vls-std"] }
anyhow = {version = "1", features = ["backtrace"]}
log = "0.4"
rumqttc = "0.12.0"
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }
pretty_env_logger = "0.4.0"

66
tester/src/main.rs Normal file
View File

@@ -0,0 +1,66 @@
use sphinx_key_parser::MsgDriver;
use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet};
use std::error::Error;
use std::time::Duration;
use vls_protocol::msgs;
const SUB_TOPIC: &str = "sphinx";
const PUB_TOPIC: &str = "sphinx-return";
const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass";
#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_credentials(USERNAME, PASSWORD);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client
.subscribe(SUB_TOPIC, QoS::AtMostOnce)
.await
.expect("could not mqtt subscribe");
client.publish(PUB_TOPIC, QoS::AtMostOnce, false, "READY".as_bytes().to_vec()).await.expect("could not pub");
loop {
let event = eventloop.poll().await;
// println!("{:?}", event.unwrap());
if let Some(mut m) = incoming_msg(event.expect("failed to unwrap event")) {
let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header");
println!("sequence {}", sequence);
println!("dbid {}", dbid);
let ping: msgs::Ping =
msgs::read_message(&mut m).expect("failed to read ping message");
println!("INCOMING: {:?}", ping);
let mut md = MsgDriver::new_empty();
msgs::write_serial_response_header(&mut md, sequence)
.expect("failed to write_serial_request_header");
let pong = msgs::Pong {
id: ping.id,
message: ping.message
};
msgs::write(&mut md, pong).expect("failed to serial write");
client
.publish(PUB_TOPIC, QoS::AtMostOnce, false, md.bytes())
.await
.expect("could not mqtt publish");
}
}
}
fn incoming_msg(event: Event) -> Option<MsgDriver> {
if let Event::Incoming(packet) = event {
if let Packet::Publish(p) = packet {
let m = MsgDriver::new(p.payload.to_vec());
return Some(m)
}
}
None
}