mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-21 16:14:23 +01:00
cln-plugin: Make rpcmethod handlers async
This commit is contained in:
committed by
Rusty Russell
parent
60e773239c
commit
a7ef38732f
@@ -13,15 +13,15 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||||||
options::Value::Integer(42),
|
options::Value::Integer(42),
|
||||||
"a test-option with default 42",
|
"a test-option with default 42",
|
||||||
))
|
))
|
||||||
.rpcmethod("testmethod", "This is a test", Box::new(testmethod))
|
.rpcmethod("testmethod", "This is a test", testmethod)
|
||||||
.subscribe("connect", Box::new(connect_handler))
|
.subscribe("connect", Box::new(connect_handler))
|
||||||
.hook("peer_connected", Box::new(peer_connected_handler))
|
//.hook("peer_connected", Box::new(peer_connected_handler))
|
||||||
.start()
|
.start()
|
||||||
.await?;
|
.await?;
|
||||||
plugin.join().await
|
plugin.join().await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn testmethod(_p: Plugin<()>, _v: &serde_json::Value) -> Result<serde_json::Value, Error> {
|
async fn testmethod(_p: Plugin<()>, _v: serde_json::Value) -> Result<serde_json::Value, Error> {
|
||||||
Ok(json!("Hello"))
|
Ok(json!("Hello"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ use futures::sink::SinkExt;
|
|||||||
extern crate log;
|
extern crate log;
|
||||||
use log::trace;
|
use log::trace;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
@@ -84,18 +86,18 @@ where
|
|||||||
|
|
||||||
/// Register a custom RPC method for the RPC passthrough from the
|
/// Register a custom RPC method for the RPC passthrough from the
|
||||||
/// main daemon
|
/// main daemon
|
||||||
pub fn rpcmethod(
|
pub fn rpcmethod<C, F>(mut self, name: &str, description: &str, callback: C) -> Builder<S, I, O>
|
||||||
mut self,
|
where
|
||||||
name: &str,
|
C: Send + Sync + 'static,
|
||||||
description: &str,
|
C: Fn(Plugin<S>, Request) -> F + 'static,
|
||||||
callback: Callback<S>,
|
F: Future<Output = Response> + Send + Sync + 'static,
|
||||||
) -> Builder<S, I, O> {
|
{
|
||||||
self.rpcmethods.insert(
|
self.rpcmethods.insert(
|
||||||
name.to_string(),
|
name.to_string(),
|
||||||
RpcMethod {
|
RpcMethod {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
description: description.to_string(),
|
description: description.to_string(),
|
||||||
callback,
|
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
self
|
self
|
||||||
@@ -171,9 +173,10 @@ where
|
|||||||
|
|
||||||
// TODO Split the two hashmaps once we fill in the hook
|
// TODO Split the two hashmaps once we fill in the hook
|
||||||
// payload structs in messages.rs
|
// payload structs in messages.rs
|
||||||
let mut rpcmethods: HashMap<String, Callback<S>> =
|
let mut rpcmethods: HashMap<String, AsyncCallback<S>> =
|
||||||
HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
|
HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
|
||||||
rpcmethods.extend(self.hooks.clone().drain().map(|(k, v)| (k, v.callback)));
|
// TODO re-enable once hooks are async again
|
||||||
|
//rpcmethods.extend(self.hooks.clone().drain().map(|(k, v)| (k, v.callback)));
|
||||||
|
|
||||||
// Start the PluginDriver to handle plugin IO
|
// Start the PluginDriver to handle plugin IO
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
@@ -237,8 +240,14 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Callback<S> = Box<fn(Plugin<S>, &serde_json::Value) -> Result<serde_json::Value, Error>>;
|
// Just some type aliases so we don't get confused in a lisp-like sea
|
||||||
type NotificationCallback<S> = Box<fn(Plugin<S>, &serde_json::Value) -> Result<(), Error>>;
|
// of parentheses.
|
||||||
|
type Request = serde_json::Value;
|
||||||
|
type Response = Result<serde_json::Value, Error>;
|
||||||
|
type Callback<S> = Box<fn(Plugin<S>, &Request) -> Response>;
|
||||||
|
type AsyncCallback<S> =
|
||||||
|
Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
|
||||||
|
type NotificationCallback<S> = Box<fn(Plugin<S>, &Request) -> Result<(), Error>>;
|
||||||
|
|
||||||
/// A struct collecting the metadata required to register a custom
|
/// A struct collecting the metadata required to register a custom
|
||||||
/// rpcmethod with the main daemon upon init. It'll get deconstructed
|
/// rpcmethod with the main daemon upon init. It'll get deconstructed
|
||||||
@@ -247,7 +256,7 @@ struct RpcMethod<S>
|
|||||||
where
|
where
|
||||||
S: Clone + Send,
|
S: Clone + Send,
|
||||||
{
|
{
|
||||||
callback: Callback<S>,
|
callback: AsyncCallback<S>,
|
||||||
description: String,
|
description: String,
|
||||||
name: String,
|
name: String,
|
||||||
}
|
}
|
||||||
@@ -291,9 +300,8 @@ struct PluginDriver<S>
|
|||||||
where
|
where
|
||||||
S: Send + Clone,
|
S: Send + Clone,
|
||||||
{
|
{
|
||||||
|
|
||||||
plugin: Plugin<S>,
|
plugin: Plugin<S>,
|
||||||
rpcmethods: HashMap<String, Callback<S>>,
|
rpcmethods: HashMap<String, AsyncCallback<S>>,
|
||||||
|
|
||||||
#[allow(dead_code)] // Unused until we fill in the Hook structs.
|
#[allow(dead_code)] // Unused until we fill in the Hook structs.
|
||||||
hooks: HashMap<String, Callback<S>>,
|
hooks: HashMap<String, Callback<S>>,
|
||||||
@@ -420,7 +428,7 @@ where
|
|||||||
method,
|
method,
|
||||||
params
|
params
|
||||||
);
|
);
|
||||||
callback(plugin.clone(), params)
|
callback(plugin.clone(), params.clone()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn dispatch_custom_notification(
|
async fn dispatch_custom_notification(
|
||||||
|
|||||||
@@ -55,5 +55,5 @@ def test_plugin_start(node_factory):
|
|||||||
assert l1.rpc.testmethod() == "Hello"
|
assert l1.rpc.testmethod() == "Hello"
|
||||||
|
|
||||||
l1.connect(l2)
|
l1.connect(l2)
|
||||||
l1.daemon.wait_for_log(r'Got a connect hook call')
|
#l1.daemon.wait_for_log(r'Got a connect hook call')
|
||||||
l1.daemon.wait_for_log(r'Got a connect notification')
|
l1.daemon.wait_for_log(r'Got a connect notification')
|
||||||
|
|||||||
Reference in New Issue
Block a user