tokio select reconnect twice, but not more?

This commit is contained in:
Evan Feenstra
2022-06-09 13:31:13 -07:00
parent 895f79f61e
commit 937ff0d8de
3 changed files with 40 additions and 39 deletions

View File

@@ -30,11 +30,6 @@ pub struct ChannelReply {
fn main() -> anyhow::Result<()> {
let parent_fd = open_parent_fd();
/*
simple_logger::SimpleLogger::new()
.with_utc_timestamps()
.with_module_level("async_io", log::LevelFilter::Off)
*/
util::setup_logging("hsmd ", "info");
let app = App::new("signer")
.setting(AppSettings::NoAutoVersion)

View File

@@ -80,43 +80,44 @@ pub fn start_broker(
let sub_task = tokio::spawn(async move {
// ready message loop
// let ready_tx_ = ready_tx.clone();
// loop {
// wait for CONNECTED
// loop {
// let status = status_rx.recv().await.unwrap();
// if status {
// break;
// }
// }
// now wait for READY
// loop {
// let message = rx.recv().await.unwrap();
// if let Some(payload) = message.payload.get(0) {
// let content = String::from_utf8_lossy(&payload[..]);
// log::info!("received message content: {}", content);
// if content == "READY" {
// // ready_tx.send(true).expect("could not send ready");
// break;
// }
// }
// }
// now start parsing... or break for DISCONNECT
// println!("OK START PARSING!");
loop {
// wait for CONNECTED
// loop {
// let status = status_rx.recv().await.unwrap();
// if status {
// break;
// }
// }
// now wait for READY
// loop {
// let message = rx.recv().await.unwrap();
// if let Some(payload) = message.payload.get(0) {
// let content = String::from_utf8_lossy(&payload[..]);
// log::info!("received message content: {}", content);
// if content == "READY" {
// // ready_tx.send(true).expect("could not send ready");
// break;
// }
// }
// }
// now start parsing... or break for DISCONNECT
println!("OK START PARSING!");
loop {
let message = rx.recv().await.unwrap();
println!("T = {}, P = {:?}", message.topic, message.payload.len());
// println!("count {}", message.payload.len());
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
println!("pub err {:?}", e);
}
let message = rx.recv().await.unwrap();
println!("T = {}, P = {:?}", message.topic, message.payload.len());
// println!("count {}", message.payload.len());
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
println!("pub err {:?}", e);
}
}
}
// }
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
loop {
let msg = receiver.recv().await.unwrap();
tx.publish(PUB_TOPIC, false, msg.message)
.await
.expect("could not mqtt pub");
@@ -125,6 +126,7 @@ pub fn start_broker(
log::warn!("could not send on reply_tx");
}
}
// println!("ABORT! relay task finished <<<<<<<<<<<<<<<");
});
servers.await;

View File

@@ -22,17 +22,21 @@ pub fn run_test() {
status = status_rx.recv() => {
// println!("got a status");
if let Some(connection_status) = status {
connected = connected;
connected = connection_status;
id = 0;
sequence = 1;
println!("========> CONNETED! {}", connection_status);
}
}
res = iteration(id, sequence, tx.clone(), connected) => {
println!("iteration! connected: {}", connected);
println!("iteration! {}", connected);
if let Err(e) = res {
panic!("iteration failed {:?}", e);
}
sequence = sequence.wrapping_add(1);
id += 1;
if connected {
sequence = sequence.wrapping_add(1);
id += 1;
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
};