broker reconnect working

This commit is contained in:
Evan Feenstra
2022-06-09 19:23:40 -07:00
parent 937ff0d8de
commit f1e8e13dda
3 changed files with 34 additions and 58 deletions

View File

@@ -17,12 +17,14 @@ pub struct Channel {
} }
/// Responses are received on the oneshot sender /// Responses are received on the oneshot sender
#[derive(Debug)]
pub struct ChannelRequest { pub struct ChannelRequest {
pub message: Vec<u8>, pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>, pub reply_tx: oneshot::Sender<ChannelReply>,
} }
// mpsc reply // mpsc reply
#[derive(Debug)]
pub struct ChannelReply { pub struct ChannelReply {
pub reply: Vec<u8>, pub reply: Vec<u8>,
} }
@@ -48,7 +50,7 @@ fn main() -> anyhow::Result<()> {
// Pretend to be the right version, given to us by an env var // Pretend to be the right version, given to us by an env var
let version = let version =
env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning");
println!("{}", version); log::info!("{}", version);
return Ok(()); return Ok(());
} }

View File

@@ -8,6 +8,7 @@ use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::timeout;
const SUB_TOPIC: &str = "sphinx-return"; const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx"; const PUB_TOPIC: &str = "sphinx";
@@ -48,85 +49,61 @@ pub fn start_broker(
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx)); let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop { loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
match metrics.tracker() { let changed: Option<bool> = match metrics.tracker() {
Some(t) => { Some(t) => {
// wait for subscription to be sure // wait for subscription to be sure
if t.concrete_subscriptions_len() > 0 { if t.concrete_subscriptions_len() > 0 {
if !client_connected { if !client_connected {
println!("CLIENT CONNECTED!"); Some(true) // changed to true
client_connected = true; } else {
status_sender None
.send(true)
.await
.expect("couldnt send true statu");
} }
} else {
None
} }
} }
None => { None => {
if client_connected { if client_connected {
println!("CLIENT DIsCONNECTED!"); Some(false)
client_connected = false; } else {
status_sender None
.send(false)
.await
.expect("couldnt send false status");
} }
} }
};
if let Some(c) = changed {
client_connected = c;
status_sender
.send(c)
.await
.expect("couldnt send connection status");
} }
tokio::time::sleep(Duration::from_millis(850)).await; tokio::time::sleep(Duration::from_millis(800)).await;
} }
}); });
let sub_task = tokio::spawn(async move { let sub_task = tokio::spawn(async move {
// ready message loop while let Ok(message) = rx.recv().await {
// let ready_tx_ = ready_tx.clone();
// loop {
// wait for CONNECTED
// loop {
// let status = status_rx.recv().await.unwrap();
// if status {
// break;
// }
// }
// now wait for READY
// loop {
// let message = rx.recv().await.unwrap();
// if let Some(payload) = message.payload.get(0) {
// let content = String::from_utf8_lossy(&payload[..]);
// log::info!("received message content: {}", content);
// if content == "READY" {
// // ready_tx.send(true).expect("could not send ready");
// break;
// }
// }
// }
// now start parsing... or break for DISCONNECT
// println!("OK START PARSING!");
loop {
let message = rx.recv().await.unwrap();
println!("T = {}, P = {:?}", message.topic, message.payload.len());
// println!("count {}", message.payload.len());
for payload in message.payload { for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await { if let Err(e) = msg_tx.send(payload.to_vec()).await {
println!("pub err {:?}", e); log::warn!("pub err {:?}", e);
} }
} }
} }
// }
}); });
let relay_task = tokio::spawn(async move { let relay_task = tokio::spawn(async move {
loop { while let Some(msg) = receiver.recv().await {
let msg = receiver.recv().await.unwrap();
tx.publish(PUB_TOPIC, false, msg.message) tx.publish(PUB_TOPIC, false, msg.message)
.await .await
.expect("could not mqtt pub"); .expect("could not mqtt pub");
let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()"); if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { if let Err(_) = msg.reply_tx.send(ChannelReply {
log::warn!("could not send on reply_tx"); reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
}
} }
} }
// println!("ABORT! relay task finished <<<<<<<<<<<<<<<");
}); });
servers.await; servers.await;

View File

@@ -14,24 +14,21 @@ pub fn run_test() {
let (tx, rx) = mpsc::channel(1000); let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
let runtime = start_broker(rx, status_tx, "test-1"); let runtime = start_broker(rx, status_tx, "test-1");
log::info!("======> READY received! start now");
runtime.block_on(async { runtime.block_on(async {
let mut connected = false; let mut connected = false;
loop { loop {
tokio::select! { tokio::select! {
status = status_rx.recv() => { status = status_rx.recv() => {
// println!("got a status");
if let Some(connection_status) = status { if let Some(connection_status) = status {
connected = connection_status; connected = connection_status;
id = 0; id = 0;
sequence = 1; sequence = 1;
println!("========> CONNETED! {}", connection_status); log::info!("========> CONNETED! {}", connection_status);
} }
} }
res = iteration(id, sequence, tx.clone(), connected) => { res = iteration(id, sequence, tx.clone(), connected) => {
println!("iteration! {}", connected);
if let Err(e) = res { if let Err(e) = res {
panic!("iteration failed {:?}", e); log::warn!("===> iteration failed {:?}", e);
} }
if connected { if connected {
sequence = sequence.wrapping_add(1); sequence = sequence.wrapping_add(1);
@@ -53,7 +50,7 @@ pub async fn iteration(
if !connected { if !connected {
return Ok(()); return Ok(());
} }
println!("do a ping!"); // log::info!("do a ping!");
let ping = msgs::Ping { let ping = msgs::Ping {
id, id,
message: WireString("ping".as_bytes().to_vec()), message: WireString("ping".as_bytes().to_vec()),
@@ -65,7 +62,7 @@ pub async fn iteration(
message: ping_bytes, message: ping_bytes,
reply_tx, reply_tx,
}; };
let _ = tx.send(request).await; tx.send(request).await?;
let res = reply_rx.await?; let res = reply_rx.await?;
let reply = parser::response_from_bytes(res.reply, sequence)?; let reply = parser::response_from_bytes(res.reply, sequence)?;
match reply { match reply {