diff --git a/plugins/grpc-plugin/src/main.rs b/plugins/grpc-plugin/src/main.rs index a83f0131d..70dffa5d3 100644 --- a/plugins/grpc-plugin/src/main.rs +++ b/plugins/grpc-plugin/src/main.rs @@ -14,7 +14,7 @@ struct PluginState { ca_cert: Vec, } -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { debug!("Starting grpc plugin"); let path = Path::new("lightning-rpc"); @@ -58,13 +58,18 @@ async fn main() -> Result<()> { let bind_addr: SocketAddr = format!("0.0.0.0:{}", bind_port).parse().unwrap(); - tokio::spawn(async move { - if let Err(e) = run_interface(bind_addr, state).await { - warn!("Error running the grpc interface: {}", e); + tokio::select! { + _ = plugin.join() => { + // This will likely never be shown, if we got here our + // parent process is exiting and not processing out log + // messages anymore. + debug!("Plugin loop terminated") } - }); - - plugin.join().await + e = run_interface(bind_addr, state) => { + warn!("Error running grpc interface: {:?}", e) + } + } + Ok(()) } async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()> { diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 53720790c..1da5f379c 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -427,13 +427,19 @@ where )) .await .context("sending init response")?; + + let joiner = plugin.wait_handle.clone(); // Start the PluginDriver to handle plugin IO - tokio::spawn( - driver.run(receiver, input, output), - // TODO Use the broadcast to distribute any error that we - // might receive here to anyone listening. (Shutdown - // signal) - ); + tokio::spawn(async move { + if let Err(e) = driver.run(receiver, input, output).await { + log::warn!("Plugin loop returned error {:?}", e); + } + + // Now that we have left the reader loop its time to + // notify any waiting tasks. This most likely will cause + // the main task to exit and the plugin to terminate. + joiner.send(()) + }); Ok(plugin) } @@ -502,16 +508,17 @@ where // the user-code, which may require some cleanups or // similar. tokio::select! { - e = self.dispatch_one(&mut input, &self.plugin) => { - //Hand any error up. - e?; - }, - v = receiver.recv() => { - output.lock().await.send( - v.context("internal communication error")? - ).await?; - }, - } + e = self.dispatch_one(&mut input, &self.plugin) => { + if let Err(e) = e { + return Err(e) + } + }, + v = receiver.recv() => { + output.lock().await.send( + v.context("internal communication error")? + ).await?; + }, + } } }