diff --git a/plugins/src/codec.rs b/plugins/src/codec.rs index b6037c9c9..ad5128722 100644 --- a/plugins/src/codec.rs +++ b/plugins/src/codec.rs @@ -11,8 +11,8 @@ use std::str::FromStr; use std::{io, str}; use tokio_util::codec::{Decoder, Encoder}; -use crate::messages::{Notification, Request}; use crate::messages::JsonRpc; +use crate::messages::{Notification, Request}; /// A simple codec that parses messages separated by two successive /// `\n` newlines. diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index e6c584ce8..9decd0791 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -322,7 +322,7 @@ where hooks: self.hooks.keys().map(|s| s.clone()).collect(), rpcmethods, dynamic: self.dynamic, - nonnumericids: true, + nonnumericids: true, } } @@ -458,8 +458,8 @@ where // Start the PluginDriver to handle plugin IO tokio::spawn(async move { if let Err(e) = driver.run(receiver, input, output).await { - log::warn!("Plugin loop returned error {:?}", e); - } + log::warn!("Plugin loop returned error {:?}", e); + } // Now that we have left the reader loop its time to // notify any waiting tasks. This most likely will cause @@ -507,7 +507,7 @@ where impl PluginDriver where - S: Send + Clone, + S: Send + Clone + Sync, { /// Run the plugin until we get a shutdown command. async fn run( @@ -526,17 +526,17 @@ where // the user-code, which may require some cleanups or // similar. tokio::select! { - e = self.dispatch_one(&mut input, &self.plugin) => { - if let Err(e) = e { - return Err(e) - } - }, - v = receiver.recv() => { - output.lock().await.send( - v.context("internal communication error")? - ).await?; - }, - } + e = self.dispatch_one(&mut input, &self.plugin) => { + if let Err(e) = e { + return Err(e) + } + }, + v = receiver.recv() => { + output.lock().await.send( + v.context("internal communication error")? + ).await?; + }, + } } } @@ -554,36 +554,74 @@ where Some(Ok(msg)) => { trace!("Received a message: {:?}", msg); match msg { - messages::JsonRpc::Request(id, p) => { - PluginDriver::::dispatch_request(id, p, plugin).await + messages::JsonRpc::Request(_id, _p) => { + todo!("This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively."); } - messages::JsonRpc::Notification(n) => { - self.dispatch_notification(n, plugin).await + messages::JsonRpc::Notification(_n) => { + todo!("As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used.") } - messages::JsonRpc::CustomRequest(id, p) => { - match self.dispatch_custom_request(id.clone(), p, plugin).await { - Ok(v) => plugin - .sender - .send(json!({ - "jsonrpc": "2.0", - "id": id, - "result": v - })) - .await - .context("returning custom result"), - Err(e) => plugin - .sender - .send(json!({ - "jsonrpc": "2.0", - "id": id, - "error": e.to_string(), - })) - .await - .context("returning custom error"), - } + messages::JsonRpc::CustomRequest(id, request) => { + trace!("Dispatching custom method {:?}", request); + let method = request + .get("method") + .context("Missing 'method' in request")? + .as_str() + .context("'method' is not a string")?; + let callback = self.rpcmethods.get(method).with_context(|| { + anyhow!("No handler for method '{}' registered", method) + })?; + let params = request + .get("params") + .context("Missing 'params' field in request")? + .clone(); + + let plugin = plugin.clone(); + let call = callback(plugin.clone(), params); + + tokio::spawn(async move { + match call.await { + Ok(v) => plugin + .sender + .send(json!({ + "jsonrpc": "2.0", + "id": id, + "result": v + })) + .await + .context("returning custom response"), + Err(e) => plugin + .sender + .send(json!({ + "jsonrpc": "2.0", + "id": id, + "error": e.to_string(), + })) + .await + .context("returning custom error"), + } + }); + Ok(()) } - messages::JsonRpc::CustomNotification(n) => { - self.dispatch_custom_notification(n, plugin).await + messages::JsonRpc::CustomNotification(request) => { + trace!("Dispatching custom notification {:?}", request); + let method = request + .get("method") + .context("Missing 'method' in request")? + .as_str() + .context("'method' is not a string")?; + let callback = self.subscriptions.get(method).with_context(|| { + anyhow!("No handler for notification '{}' registered", method) + })?; + let params = request + .get("params") + .context("Missing 'params' field in request")? + .clone(); + + let plugin = plugin.clone(); + let call = callback(plugin.clone(), params); + + tokio::spawn(async move { call.await.unwrap() }); + Ok(()) } } } @@ -591,85 +629,6 @@ where None => Err(anyhow!("Error reading from master")), } } - - async fn dispatch_request( - _id: serde_json::Value, - _request: messages::Request, - _plugin: &Plugin, - ) -> Result<(), Error> { - todo!("This is unreachable until we start filling in messages:Request. Until then the custom dispatcher below is used exclusively.") - } - - async fn dispatch_notification( - &self, - _notification: messages::Notification, - _plugin: &Plugin, - ) -> Result<(), Error> - where - S: Send + Clone, - { - todo!("As soon as we define the full structure of the messages::Notification we'll get here. Until then the custom dispatcher below is used.") - } - - async fn dispatch_custom_request( - &self, - _id: serde_json::Value, - request: serde_json::Value, - plugin: &Plugin, - ) -> Result { - let method = request - .get("method") - .context("Missing 'method' in request")? - .as_str() - .context("'method' is not a string")?; - - let params = request - .get("params") - .context("Missing 'params' field in request")?; - let callback = self - .rpcmethods - .get(method) - .with_context(|| anyhow!("No handler for method '{}' registered", method))?; - - trace!( - "Dispatching custom request: method={}, params={}", - method, - params - ); - callback(plugin.clone(), params.clone()).await - } - - async fn dispatch_custom_notification( - &self, - notification: serde_json::Value, - plugin: &Plugin, - ) -> Result<(), Error> - where - S: Send + Clone, - { - trace!("Dispatching custom notification {:?}", notification); - let method = notification - .get("method") - .context("Missing 'method' in notification")? - .as_str() - .context("'method' is not a string")?; - let params = notification - .get("params") - .context("Missing 'params' field in notification")?; - let callback = self - .subscriptions - .get(method) - .with_context(|| anyhow!("No handler for method '{}' registered", method))?; - trace!( - "Dispatching custom request: method={}, params={}", - method, - params - ); - if let Err(e) = callback(plugin.clone(), params.clone()).await { - log::error!("Error in notification handler '{}': {}", method, e); - } - Ok(()) - } } impl Plugin @@ -715,7 +674,7 @@ mod test { #[tokio::test] async fn init() { - let state = (); + let state = (); let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout()); let _ = builder.start(state); } diff --git a/plugins/src/messages.rs b/plugins/src/messages.rs index 0a7a8e71b..ff3c9fae1 100644 --- a/plugins/src/messages.rs +++ b/plugins/src/messages.rs @@ -41,20 +41,20 @@ pub(crate) enum Request { #[serde(tag = "method", content = "params")] #[serde(rename_all = "snake_case")] pub(crate) enum Notification { -// ChannelOpened, -// ChannelOpenFailed, -// ChannelStateChanged, -// Connect, -// Disconnect, -// InvoicePayment, -// InvoiceCreation, -// Warning, -// ForwardEvent, -// SendpaySuccess, -// SendpayFailure, -// CoinMovement, -// OpenchannelPeerSigs, -// Shutdown, + // ChannelOpened, + // ChannelOpenFailed, + // ChannelStateChanged, + // Connect, + // Disconnect, + // InvoicePayment, + // InvoiceCreation, + // Warning, + // ForwardEvent, + // SendpaySuccess, + // SendpayFailure, + // CoinMovement, + // OpenchannelPeerSigs, + // Shutdown, } #[derive(Deserialize, Debug)] diff --git a/plugins/src/options.rs b/plugins/src/options.rs index 909ccd7ff..65fd51759 100644 --- a/plugins/src/options.rs +++ b/plugins/src/options.rs @@ -28,7 +28,7 @@ impl Value { _ => None, } } - + /// Returns true if the `Value` is an integer between `i64::MIN` and /// `i64::MAX`. /// @@ -36,8 +36,6 @@ impl Value { /// return the integer value. pub fn is_i64(&self) -> bool { self.as_i64().is_some() - - } /// If the `Value` is an integer, represent it as i64. Returns diff --git a/tests/test_cln_rs.py b/tests/test_cln_rs.py index 49ea388ae..9485bc259 100644 --- a/tests/test_cln_rs.py +++ b/tests/test_cln_rs.py @@ -249,10 +249,6 @@ def test_grpc_wrong_auth(node_factory): stub.Getinfo(nodepb.GetinfoRequest()) -@pytest.mark.xfail( - reason="Times out because we can't call the RPC method while currently holding on to HTLCs", - strict=True, -) def test_cln_plugin_reentrant(node_factory, executor): """Ensure that we continue processing events while already handling.