From f1e8e13dda9d8f1752f0749415aa5631019e69e9 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 9 Jun 2022 19:23:40 -0700 Subject: [PATCH] broker reconnect working --- broker/src/main.rs | 4 ++- broker/src/mqtt.rs | 77 +++++++++++++++--------------------------- broker/src/run_test.rs | 11 +++--- 3 files changed, 34 insertions(+), 58 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index a67d54f..eed32e5 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -17,12 +17,14 @@ pub struct Channel { } /// Responses are received on the oneshot sender +#[derive(Debug)] pub struct ChannelRequest { pub message: Vec, pub reply_tx: oneshot::Sender, } // mpsc reply +#[derive(Debug)] pub struct ChannelReply { pub reply: Vec, } @@ -48,7 +50,7 @@ fn main() -> anyhow::Result<()> { // Pretend to be the right version, given to us by an env var let version = env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); - println!("{}", version); + log::info!("{}", version); return Ok(()); } diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index fe6b61a..15340bb 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use tokio::sync::mpsc; +use tokio::time::timeout; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; @@ -48,85 +49,61 @@ pub fn start_broker( let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); loop { let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); - match metrics.tracker() { + let changed: Option = match metrics.tracker() { Some(t) => { // wait for subscription to be sure if t.concrete_subscriptions_len() > 0 { if !client_connected { - println!("CLIENT CONNECTED!"); - client_connected = true; - status_sender - .send(true) - .await - .expect("couldnt send true statu"); + Some(true) // changed to true + } else { + None } + } else { + None } } None => { if client_connected { - println!("CLIENT DIsCONNECTED!"); - client_connected = false; - status_sender - .send(false) - .await - .expect("couldnt send false status"); + Some(false) + } else { + None } } + }; + if let Some(c) = changed { + client_connected = c; + status_sender + .send(c) + .await + .expect("couldnt send connection status"); } - tokio::time::sleep(Duration::from_millis(850)).await; + tokio::time::sleep(Duration::from_millis(800)).await; } }); let sub_task = tokio::spawn(async move { - // ready message loop - // let ready_tx_ = ready_tx.clone(); - // loop { - // wait for CONNECTED - // loop { - // let status = status_rx.recv().await.unwrap(); - // if status { - // break; - // } - // } - // now wait for READY - // loop { - // let message = rx.recv().await.unwrap(); - // if let Some(payload) = message.payload.get(0) { - // let content = String::from_utf8_lossy(&payload[..]); - // log::info!("received message content: {}", content); - // if content == "READY" { - // // ready_tx.send(true).expect("could not send ready"); - // break; - // } - // } - // } - // now start parsing... or break for DISCONNECT - // println!("OK START PARSING!"); - loop { - let message = rx.recv().await.unwrap(); - println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); + while let Ok(message) = rx.recv().await { for payload in message.payload { if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); + log::warn!("pub err {:?}", e); } } } - // } }); let relay_task = tokio::spawn(async move { - loop { - let msg = receiver.recv().await.unwrap(); + while let Some(msg) = receiver.recv().await { tx.publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); - let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()"); - if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { - log::warn!("could not send on reply_tx"); + if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { + if let Err(_) = msg.reply_tx.send(ChannelReply { + reply: reply.unwrap(), + }) { + log::warn!("could not send on reply_tx"); + } } } - // println!("ABORT! relay task finished <<<<<<<<<<<<<<<"); }); servers.await; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index db86a8f..952cc8f 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -14,24 +14,21 @@ pub fn run_test() { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); let runtime = start_broker(rx, status_tx, "test-1"); - log::info!("======> READY received! start now"); runtime.block_on(async { let mut connected = false; loop { tokio::select! { status = status_rx.recv() => { - // println!("got a status"); if let Some(connection_status) = status { connected = connection_status; id = 0; sequence = 1; - println!("========> CONNETED! {}", connection_status); + log::info!("========> CONNETED! {}", connection_status); } } res = iteration(id, sequence, tx.clone(), connected) => { - println!("iteration! {}", connected); if let Err(e) = res { - panic!("iteration failed {:?}", e); + log::warn!("===> iteration failed {:?}", e); } if connected { sequence = sequence.wrapping_add(1); @@ -53,7 +50,7 @@ pub async fn iteration( if !connected { return Ok(()); } - println!("do a ping!"); + // log::info!("do a ping!"); let ping = msgs::Ping { id, message: WireString("ping".as_bytes().to_vec()), @@ -65,7 +62,7 @@ pub async fn iteration( message: ping_bytes, reply_tx, }; - let _ = tx.send(request).await; + tx.send(request).await?; let res = reply_rx.await?; let reply = parser::response_from_bytes(res.reply, sequence)?; match reply {