From 937ff0d8deba40a6142f67024f6d9670fb47c121 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 9 Jun 2022 13:31:13 -0700 Subject: [PATCH] tokio select reconnect twice, but not more? --- broker/src/main.rs | 5 ---- broker/src/mqtt.rs | 62 ++++++++++++++++++++++-------------------- broker/src/run_test.rs | 12 +++++--- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index ad392cb..a67d54f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -30,11 +30,6 @@ pub struct ChannelReply { fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); - /* - simple_logger::SimpleLogger::new() - .with_utc_timestamps() - .with_module_level("async_io", log::LevelFilter::Off) - */ util::setup_logging("hsmd ", "info"); let app = App::new("signer") .setting(AppSettings::NoAutoVersion) diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 712aa74..fe6b61a 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -80,43 +80,44 @@ pub fn start_broker( let sub_task = tokio::spawn(async move { // ready message loop // let ready_tx_ = ready_tx.clone(); + // loop { + // wait for CONNECTED + // loop { + // let status = status_rx.recv().await.unwrap(); + // if status { + // break; + // } + // } + // now wait for READY + // loop { + // let message = rx.recv().await.unwrap(); + // if let Some(payload) = message.payload.get(0) { + // let content = String::from_utf8_lossy(&payload[..]); + // log::info!("received message content: {}", content); + // if content == "READY" { + // // ready_tx.send(true).expect("could not send ready"); + // break; + // } + // } + // } + // now start parsing... or break for DISCONNECT + // println!("OK START PARSING!"); loop { - // wait for CONNECTED - // loop { - // let status = status_rx.recv().await.unwrap(); - // if status { - // break; - // } - // } - // now wait for READY - // loop { - // let message = rx.recv().await.unwrap(); - // if let Some(payload) = message.payload.get(0) { - // let content = String::from_utf8_lossy(&payload[..]); - // log::info!("received message content: {}", content); - // if content == "READY" { - // // ready_tx.send(true).expect("could not send ready"); - // break; - // } - // } - // } - // now start parsing... or break for DISCONNECT - println!("OK START PARSING!"); - loop { - let message = rx.recv().await.unwrap(); - println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); - for payload in message.payload { - if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); - } + let message = rx.recv().await.unwrap(); + println!("T = {}, P = {:?}", message.topic, message.payload.len()); + // println!("count {}", message.payload.len()); + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()).await { + println!("pub err {:?}", e); } } } + // } }); let relay_task = tokio::spawn(async move { - while let Some(msg) = receiver.recv().await { + loop { + let msg = receiver.recv().await.unwrap(); tx.publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); @@ -125,6 +126,7 @@ pub fn start_broker( log::warn!("could not send on reply_tx"); } } + // println!("ABORT! relay task finished <<<<<<<<<<<<<<<"); }); servers.await; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 865d570..db86a8f 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -22,17 +22,21 @@ pub fn run_test() { status = status_rx.recv() => { // println!("got a status"); if let Some(connection_status) = status { - connected = connected; + connected = connection_status; + id = 0; + sequence = 1; println!("========> CONNETED! {}", connection_status); } } res = iteration(id, sequence, tx.clone(), connected) => { - println!("iteration! connected: {}", connected); + println!("iteration! {}", connected); if let Err(e) = res { panic!("iteration failed {:?}", e); } - sequence = sequence.wrapping_add(1); - id += 1; + if connected { + sequence = sequence.wrapping_add(1); + id += 1; + } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } };