diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 5383455..20993de 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -2,11 +2,25 @@ name = "sphinx-key-broker" version = "0.1.0" edition = "2021" - +default-run = "sphinx-key-broker" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] rumqttd = "0.11.0" pretty_env_logger = "0.4.0" confy = "0.4.0" -tokio = "1.18" \ No newline at end of file +tokio = "1.18" +vls-protocol = { path = "../../validating-lightning-signer/vls-protocol" } +secp256k1 = { version = "0.20", features = ["rand-std", "bitcoin_hashes"] } +anyhow = {version = "1", features = ["backtrace"]} +log = "0.4" +rumqttc = "0.12.0" + +[features] +default = ["std"] +std = ["vls-protocol/std"] + +[[bin]] +name = "test_client" +path = "src/test_client.rs" +# ./target/debug/test_client \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index c59ceee..d688cf1 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,47 +1,71 @@ +mod msg; + use librumqttd::{async_locallink::construct_broker, Config}; use std::thread; use tokio::time::{sleep, Duration}; +use vls_protocol::msgs::{self, Message}; +use vls_protocol::serde_bolt::WireString; +use tokio::sync::mpsc; + +const SUB_TOPIC: &str = "sphinx-return"; +const TRIGGER_TOPIC: &str = "trigger"; +const PUB_TOPIC: &str = "sphinx"; fn main() { - println!("start!"); pretty_env_logger::init(); - let config: Config = confy::load_path("config/rumqttd.conf").expect("no conf file"); + let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); let (mut router, console, servers, builder) = construct_broker(config); - println!("start router now"); thread::spawn(move || { - router.start().expect("could start broker"); + router.start().unwrap(); }); - let mut runtime = tokio::runtime::Builder::new_multi_thread(); - runtime.enable_all(); - runtime - .build() - .expect("tokio Builder failed") - .block_on(async { - let (mut tx, _) = builder - .connect("localclient", 200) - .await - .expect("couldnt connect"); + let mut rt = tokio::runtime::Builder::new_multi_thread(); + rt.enable_all(); + rt.build().unwrap().block_on(async { + let (msg_tx, mut msg_rx): (mpsc::UnboundedSender>, mpsc::UnboundedReceiver>) = mpsc::unbounded_channel(); + let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); + tx.subscribe([SUB_TOPIC, TRIGGER_TOPIC]).await.unwrap(); - let console_task = tokio::spawn(console); + let console_task = tokio::spawn(console); - tokio::spawn(async move { - for i in 0..10usize { - sleep(Duration::from_millis(1000)).await; - let topic = "hello/world"; - tx.publish(topic, false, vec![i as u8; 1]).await.unwrap(); - } - }); + let pub_task = tokio::spawn(async move { - println!("await now"); - - servers.await; - // pub_task.await.expect("FAIL pub task"); - // sub_task.await.expect("FAIL sub task"); - console_task.await.expect("FAIL console task"); - - println!("YOYO"); + while let Some(_) = msg_rx.recv().await { + let sequence = 0; + let mut md = msg::MsgDriver::new_empty(); + msgs::write_serial_request_header(&mut md, sequence, 0).expect("failed to write_serial_request_header"); + let ping = msgs::Ping { + id: 0, + message: WireString("ping".as_bytes().to_vec()), + }; + msgs::write(&mut md, ping).expect("failed to serial write"); + tx.publish(PUB_TOPIC, false, md.bytes()).await.unwrap(); + } }); + + let sub_task = tokio::spawn(async move { + loop { + let message = rx.recv().await.unwrap(); + // println!("T = {}, P = {:?}", message.topic, message.payload.len()); + // println!("count {}", message.payload.len()); + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()) { + println!("pub err {:?}", e); + } + } + } + }); + + servers.await; + println!("server awaited"); + pub_task.await.unwrap(); + println!("pub awaited"); + sub_task.await.unwrap(); + println!("sub awaited"); + console_task.await.unwrap(); + }); } + + diff --git a/broker/src/msg.rs b/broker/src/msg.rs new file mode 100644 index 0000000..30b312b --- /dev/null +++ b/broker/src/msg.rs @@ -0,0 +1,41 @@ +use vls_protocol::serde_bolt::{self, Read, Write}; + +pub struct MsgDriver(Vec); + +impl MsgDriver { + pub fn new(raw: Vec) -> Self { + Self(raw) + } + pub fn new_empty() -> Self { + Self(Vec::new()) + } + pub fn bytes(&self) -> Vec { + self.0.clone() + } +} + +impl Read for MsgDriver { + type Error = serde_bolt::Error; + + // input: buf to be written. Should already be the right size + fn read(&mut self, mut buf: &mut [u8]) -> serde_bolt::Result { + if buf.is_empty() { + return Ok(0); + } + let len = self.0.len(); + Ok(len) + } + + fn peek(&mut self) -> serde_bolt::Result> { + Ok(Some(0)) + } +} + +impl Write for MsgDriver { + type Error = serde_bolt::Error; + + fn write_all(&mut self, buf: &[u8]) -> serde_bolt::Result<()> { + self.0.extend(buf.iter().cloned()); + Ok(()) + } +} diff --git a/broker/src/test_client.rs b/broker/src/test_client.rs new file mode 100644 index 0000000..e975eb4 --- /dev/null +++ b/broker/src/test_client.rs @@ -0,0 +1,50 @@ +use tokio::{task, time}; + +use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(worker_threads = 1)] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_credentials("sphinx-key", "sphinx-key-pass"); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + requests(client).await; + time::sleep(Duration::from_secs(3)).await; + }); + + loop { + let event = eventloop.poll().await; + // println!("{:?}", event.unwrap()); + if let Event::Incoming(packet) = event.unwrap() { + if let Packet::Publish(p) = packet { + println!("incoming {:?}", p.payload); + } + } + } +} + +async fn requests(client: AsyncClient) { + + client + .subscribe("sphinx", QoS::AtMostOnce) + .await + .unwrap(); + + for _ in 1..=10 { + client + .publish("trigger", QoS::AtMostOnce, false, vec![1; 1]) + .await + .unwrap(); + + time::sleep(Duration::from_secs(1)).await; + } + + time::sleep(Duration::from_secs(120)).await; +} \ No newline at end of file diff --git a/sphinx-key/hmq.html b/sphinx-key/hmq.html index 8e534b8..fa5cc8b 100644 --- a/sphinx-key/hmq.html +++ b/sphinx-key/hmq.html @@ -104,8 +104,10 @@