fix tester topics

This commit is contained in:
Evan Feenstra
2023-05-24 16:39:35 -07:00
parent 877c9b8a83
commit d447ea4306

View File

@@ -72,19 +72,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
}; };
let vls_topic = format!("{}/{}", client_id, topics::VLS);
client client
.subscribe(topics::VLS, QoS::AtMostOnce) .subscribe(vls_topic, QoS::AtMostOnce)
.await .await
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
let ctrl_topic = format!("{}/{}", client_id, topics::CONTROL);
client client
.subscribe(topics::CONTROL, QoS::AtMostOnce) .subscribe(ctrl_topic, QoS::AtMostOnce)
.await .await
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
if is_test { if is_test {
run_test(eventloop, &client, ctrlr, is_log).await; run_test(eventloop, &client, ctrlr, is_log, client_id).await;
} else { } else {
run_main(eventloop, &client, ctrlr, is_log, &seed, network).await; run_main(eventloop, &client, ctrlr, is_log, &seed, network, client_id).await;
} }
} }
} }
@@ -96,6 +98,7 @@ async fn run_main(
is_log: bool, is_log: bool,
seed: &[u8], seed: &[u8],
network: Network, network: Network,
client_id: &str,
) { ) {
let store_path = env::var("STORE_PATH").unwrap_or(ROOT_STORE.to_string()); let store_path = env::var("STORE_PATH").unwrap_or(ROOT_STORE.to_string());
@@ -111,46 +114,52 @@ async fn run_main(
println!("{:?}", event); println!("{:?}", event);
if let Some((topic, msg_bytes)) = incoming_bytes(event) { if let Some((topic, msg_bytes)) = incoming_bytes(event) {
println!("MSG BYTES {:}", msg_bytes.len()); println!("MSG BYTES {:}", msg_bytes.len());
match topic.as_str() { log::info!(">>> {}", topic.as_str());
topics::VLS => { if topic.as_str().ends_with(topics::VLS) {
match sphinx_signer::root::handle(&root_handler, msg_bytes, is_log) { match sphinx_signer::root::handle(&root_handler, msg_bytes, is_log) {
Ok(b) => client Ok(b) => {
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b) let ret_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
client
.publish(ret_topic, QoS::AtMostOnce, false, b)
.await .await
.expect("could not publish init response"), .expect("could not publish init response");
Err(e) => client },
Err(e) => {
let err_topic = format!("{}/{}", client_id, topics::ERROR);
client
.publish( .publish(
topics::ERROR, err_topic,
QoS::AtMostOnce, QoS::AtMostOnce,
false, false,
e.to_string().as_bytes(), e.to_string().as_bytes(),
) )
.await .await
.expect("could not publish error response"), .expect("could not publish error response");
}; }
} };
topics::CONTROL => { } else if topic.as_str().ends_with(topics::CONTROL) {
match ctrlr.handle(&msg_bytes) { match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => { Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res) let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response"); .expect("could not build control response");
client let ctrl_ret_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN);
.publish( client
topics::CONTROL_RETURN, .publish(
QoS::AtMostOnce, ctrl_ret_topic,
false, QoS::AtMostOnce,
res_data, false,
) res_data,
.await )
.expect("could not mqtt publish"); .await
} .expect("could not mqtt publish");
Err(e) => log::warn!("error parsing ctrl msg {:?}", e), }
}; Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
} };
_ => log::info!("invalid topic"), } else {
log::info!("invalid topic");
} }
} }
} },
Err(e) => { Err(e) => {
log::warn!("diconnected {:?}", e); log::warn!("diconnected {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
@@ -165,6 +174,7 @@ async fn run_test(
client: &AsyncClient, client: &AsyncClient,
mut ctrlr: Controller, mut ctrlr: Controller,
is_log: bool, is_log: bool,
client_id: &str,
) { ) {
// test handler loop // test handler loop
loop { loop {
@@ -172,44 +182,45 @@ async fn run_test(
Ok(event) => { Ok(event) => {
println!("{:?}", event); println!("{:?}", event);
if let Some((topic, msg_bytes)) = incoming_bytes(event) { if let Some((topic, msg_bytes)) = incoming_bytes(event) {
match topic.as_str() { log::info!(">>> {}", topic.as_str());
topics::VLS => { if topic.as_str().ends_with(topics::VLS) {
let (ping, header) = let (ping, header) =
parser::request_from_bytes::<msgs::Ping>(msg_bytes) parser::request_from_bytes::<msgs::Ping>(msg_bytes)
.expect("read ping header"); .expect("read ping header");
if is_log { if is_log {
println!("INCOMING: {:?}", ping); println!("INCOMING: {:?}", ping);
}
let pong = msgs::Pong {
id: ping.id,
message: ping.message,
};
let bytes = parser::raw_response_from_msg(pong, header.sequence)
.expect("couldnt parse raw response");
let ret_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
client
.publish(ret_topic, QoS::AtMostOnce, false, bytes)
.await
.expect("could not mqtt publish");
} else if topic.as_str().ends_with(topics::CONTROL) {
match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response");
let ctrl_ret_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN);
client
.publish(
ctrl_ret_topic,
QoS::AtMostOnce,
false,
res_data,
)
.await
.expect("could not mqtt publish");
} }
let pong = msgs::Pong { Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
id: ping.id, };
message: ping.message, } else {
}; log::info!("invalid topic");
let bytes = parser::raw_response_from_msg(pong, header.sequence)
.expect("couldnt parse raw response");
client
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes)
.await
.expect("could not mqtt publish");
}
topics::CONTROL => {
match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response");
client
.publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
res_data,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
} }
} }
} }