diff --git a/tester/src/main.rs b/tester/src/main.rs index 5af024d..acd1eba 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -72,19 +72,21 @@ async fn main() -> Result<(), Box> { } }; + let vls_topic = format!("{}/{}", client_id, topics::VLS); client - .subscribe(topics::VLS, QoS::AtMostOnce) + .subscribe(vls_topic, QoS::AtMostOnce) .await .expect("could not mqtt subscribe"); + let ctrl_topic = format!("{}/{}", client_id, topics::CONTROL); client - .subscribe(topics::CONTROL, QoS::AtMostOnce) + .subscribe(ctrl_topic, QoS::AtMostOnce) .await .expect("could not mqtt subscribe"); if is_test { - run_test(eventloop, &client, ctrlr, is_log).await; + run_test(eventloop, &client, ctrlr, is_log, client_id).await; } else { - run_main(eventloop, &client, ctrlr, is_log, &seed, network).await; + run_main(eventloop, &client, ctrlr, is_log, &seed, network, client_id).await; } } } @@ -96,6 +98,7 @@ async fn run_main( is_log: bool, seed: &[u8], network: Network, + client_id: &str, ) { let store_path = env::var("STORE_PATH").unwrap_or(ROOT_STORE.to_string()); @@ -111,46 +114,52 @@ async fn run_main( println!("{:?}", event); if let Some((topic, msg_bytes)) = incoming_bytes(event) { println!("MSG BYTES {:}", msg_bytes.len()); - match topic.as_str() { - topics::VLS => { - match sphinx_signer::root::handle(&root_handler, msg_bytes, is_log) { - Ok(b) => client - .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b) + log::info!(">>> {}", topic.as_str()); + if topic.as_str().ends_with(topics::VLS) { + match sphinx_signer::root::handle(&root_handler, msg_bytes, is_log) { + Ok(b) => { + let ret_topic = format!("{}/{}", client_id, topics::VLS_RETURN); + client + .publish(ret_topic, QoS::AtMostOnce, false, b) .await - .expect("could not publish init response"), - Err(e) => client + .expect("could not publish init response"); + }, + Err(e) => { + let err_topic = format!("{}/{}", client_id, topics::ERROR); + client .publish( - topics::ERROR, + err_topic, QoS::AtMostOnce, false, e.to_string().as_bytes(), ) .await - .expect("could not publish error response"), - }; - } - topics::CONTROL => { - match ctrlr.handle(&msg_bytes) { - Ok((_msg, res)) => { - let res_data = rmp_serde::to_vec_named(&res) - .expect("could not build control response"); - client - .publish( - topics::CONTROL_RETURN, - QoS::AtMostOnce, - false, - res_data, - ) - .await - .expect("could not mqtt publish"); - } - Err(e) => log::warn!("error parsing ctrl msg {:?}", e), - }; - } - _ => log::info!("invalid topic"), + .expect("could not publish error response"); + } + }; + } else if topic.as_str().ends_with(topics::CONTROL) { + match ctrlr.handle(&msg_bytes) { + Ok((_msg, res)) => { + let res_data = rmp_serde::to_vec_named(&res) + .expect("could not build control response"); + let ctrl_ret_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN); + client + .publish( + ctrl_ret_topic, + QoS::AtMostOnce, + false, + res_data, + ) + .await + .expect("could not mqtt publish"); + } + Err(e) => log::warn!("error parsing ctrl msg {:?}", e), + }; + } else { + log::info!("invalid topic"); } } - } + }, Err(e) => { log::warn!("diconnected {:?}", e); tokio::time::sleep(Duration::from_secs(1)).await; @@ -165,6 +174,7 @@ async fn run_test( client: &AsyncClient, mut ctrlr: Controller, is_log: bool, + client_id: &str, ) { // test handler loop loop { @@ -172,44 +182,45 @@ async fn run_test( Ok(event) => { println!("{:?}", event); if let Some((topic, msg_bytes)) = incoming_bytes(event) { - match topic.as_str() { - topics::VLS => { - let (ping, header) = - parser::request_from_bytes::(msg_bytes) - .expect("read ping header"); - if is_log { - println!("INCOMING: {:?}", ping); + log::info!(">>> {}", topic.as_str()); + if topic.as_str().ends_with(topics::VLS) { + let (ping, header) = + parser::request_from_bytes::(msg_bytes) + .expect("read ping header"); + if is_log { + println!("INCOMING: {:?}", ping); + } + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + let bytes = parser::raw_response_from_msg(pong, header.sequence) + .expect("couldnt parse raw response"); + let ret_topic = format!("{}/{}", client_id, topics::VLS_RETURN); + client + .publish(ret_topic, QoS::AtMostOnce, false, bytes) + .await + .expect("could not mqtt publish"); + } else if topic.as_str().ends_with(topics::CONTROL) { + match ctrlr.handle(&msg_bytes) { + Ok((_msg, res)) => { + let res_data = rmp_serde::to_vec_named(&res) + .expect("could not build control response"); + let ctrl_ret_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN); + client + .publish( + ctrl_ret_topic, + QoS::AtMostOnce, + false, + res_data, + ) + .await + .expect("could not mqtt publish"); } - let pong = msgs::Pong { - id: ping.id, - message: ping.message, - }; - let bytes = parser::raw_response_from_msg(pong, header.sequence) - .expect("couldnt parse raw response"); - client - .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes) - .await - .expect("could not mqtt publish"); - } - topics::CONTROL => { - match ctrlr.handle(&msg_bytes) { - Ok((_msg, res)) => { - let res_data = rmp_serde::to_vec_named(&res) - .expect("could not build control response"); - client - .publish( - topics::CONTROL_RETURN, - QoS::AtMostOnce, - false, - res_data, - ) - .await - .expect("could not mqtt publish"); - } - Err(e) => log::warn!("error parsing ctrl msg {:?}", e), - }; - } - _ => log::info!("invalid topic"), + Err(e) => log::warn!("error parsing ctrl msg {:?}", e), + }; + } else { + log::info!("invalid topic"); } } }