vls protocol broker and test client

This commit is contained in:
Evan Feenstra
2022-06-01 20:21:10 -07:00
parent 971671c906
commit 1872af3687
5 changed files with 173 additions and 34 deletions

View File

@@ -1,47 +1,71 @@
mod msg;
use librumqttd::{async_locallink::construct_broker, Config};
use std::thread;
use tokio::time::{sleep, Duration};
use vls_protocol::msgs::{self, Message};
use vls_protocol::serde_bolt::WireString;
use tokio::sync::mpsc;
const SUB_TOPIC: &str = "sphinx-return";
const TRIGGER_TOPIC: &str = "trigger";
const PUB_TOPIC: &str = "sphinx";
fn main() {
println!("start!");
pretty_env_logger::init();
let config: Config = confy::load_path("config/rumqttd.conf").expect("no conf file");
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
let (mut router, console, servers, builder) = construct_broker(config);
println!("start router now");
thread::spawn(move || {
router.start().expect("could start broker");
router.start().unwrap();
});
let mut runtime = tokio::runtime::Builder::new_multi_thread();
runtime.enable_all();
runtime
.build()
.expect("tokio Builder failed")
.block_on(async {
let (mut tx, _) = builder
.connect("localclient", 200)
.await
.expect("couldnt connect");
let mut rt = tokio::runtime::Builder::new_multi_thread();
rt.enable_all();
rt.build().unwrap().block_on(async {
let (msg_tx, mut msg_rx): (mpsc::UnboundedSender<Vec<u8>>, mpsc::UnboundedReceiver<Vec<u8>>) = mpsc::unbounded_channel();
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
tx.subscribe([SUB_TOPIC, TRIGGER_TOPIC]).await.unwrap();
let console_task = tokio::spawn(console);
let console_task = tokio::spawn(console);
tokio::spawn(async move {
for i in 0..10usize {
sleep(Duration::from_millis(1000)).await;
let topic = "hello/world";
tx.publish(topic, false, vec![i as u8; 1]).await.unwrap();
}
});
let pub_task = tokio::spawn(async move {
println!("await now");
servers.await;
// pub_task.await.expect("FAIL pub task");
// sub_task.await.expect("FAIL sub task");
console_task.await.expect("FAIL console task");
println!("YOYO");
while let Some(_) = msg_rx.recv().await {
let sequence = 0;
let mut md = msg::MsgDriver::new_empty();
msgs::write_serial_request_header(&mut md, sequence, 0).expect("failed to write_serial_request_header");
let ping = msgs::Ping {
id: 0,
message: WireString("ping".as_bytes().to_vec()),
};
msgs::write(&mut md, ping).expect("failed to serial write");
tx.publish(PUB_TOPIC, false, md.bytes()).await.unwrap();
}
});
let sub_task = tokio::spawn(async move {
loop {
let message = rx.recv().await.unwrap();
// println!("T = {}, P = {:?}", message.topic, message.payload.len());
// println!("count {}", message.payload.len());
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()) {
println!("pub err {:?}", e);
}
}
}
});
servers.await;
println!("server awaited");
pub_task.await.unwrap();
println!("pub awaited");
sub_task.await.unwrap();
println!("sub awaited");
console_task.await.unwrap();
});
}