mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-22 08:34:20 +01:00
cln-plugin: Add support for synchronous RPC methods
Changelog-Experimental: cln-plugin: Added support for non-async RPC method passthrough (async support coming soon)
This commit is contained in:
committed by
Rusty Russell
parent
22618a2f94
commit
8c6af21169
@@ -1,8 +1,9 @@
|
||||
use crate::codec::{JsonCodec, JsonRpcCodec};
|
||||
pub use anyhow::{anyhow, Context, Error};
|
||||
pub use anyhow::{anyhow, Context};
|
||||
use futures::sink::SinkExt;
|
||||
extern crate log;
|
||||
use log::trace;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::sync::Mutex;
|
||||
@@ -21,11 +22,18 @@ pub mod options;
|
||||
|
||||
use options::ConfigOption;
|
||||
|
||||
/// Need to tell us about something that went wrong? Use this error
|
||||
/// type to do that. Use this alias to be safe from future changes in
|
||||
/// our internal error handling, since we'll implement any necessary
|
||||
/// conversions for you :-)
|
||||
pub type Error = anyhow::Error;
|
||||
|
||||
/// Builder for a new plugin.
|
||||
pub struct Builder<S, I, O>
|
||||
where
|
||||
I: AsyncRead + Unpin,
|
||||
O: Send + AsyncWrite + Unpin,
|
||||
S: Clone + Send,
|
||||
{
|
||||
state: S,
|
||||
|
||||
@@ -39,6 +47,7 @@ where
|
||||
subscriptions: Subscriptions,
|
||||
|
||||
options: Vec<ConfigOption>,
|
||||
rpcmethods: HashMap<String, RpcMethod<S>>,
|
||||
}
|
||||
|
||||
impl<S, I, O> Builder<S, I, O>
|
||||
@@ -55,6 +64,7 @@ where
|
||||
hooks: Hooks::default(),
|
||||
subscriptions: Subscriptions::default(),
|
||||
options: vec![],
|
||||
rpcmethods: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +73,23 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
pub fn rpcmethod(
|
||||
mut self,
|
||||
name: &str,
|
||||
description: &str,
|
||||
callback: Callback<S>,
|
||||
) -> Builder<S, I, O> {
|
||||
self.rpcmethods.insert(
|
||||
name.to_string(),
|
||||
RpcMethod {
|
||||
name: name.to_string(),
|
||||
description: description.to_string(),
|
||||
callback,
|
||||
},
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
/// Build and start the plugin loop. This performs the handshake
|
||||
/// and spawns a new task that accepts incoming messages from
|
||||
/// c-lightning and dispatches them to the handlers. It only
|
||||
@@ -119,19 +146,31 @@ where
|
||||
o => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
|
||||
};
|
||||
|
||||
let (tx, _) = tokio::sync::broadcast::channel(1);
|
||||
let (wait_handle, _) = tokio::sync::broadcast::channel(1);
|
||||
|
||||
// Collect the callbacks and create the hashmap for the dispatcher.
|
||||
let mut rpcmethods = HashMap::new();
|
||||
for (name, callback) in self.rpcmethods.drain().map(|(k, v)| (k, v.callback)) {
|
||||
rpcmethods.insert(name, callback);
|
||||
}
|
||||
|
||||
// An MPSC pair used by anything that needs to send messages
|
||||
// to the main daemon.
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(4);
|
||||
let plugin = Plugin {
|
||||
state: self.state,
|
||||
options: self.options,
|
||||
wait_handle: tx,
|
||||
wait_handle,
|
||||
sender,
|
||||
};
|
||||
|
||||
// Start the PluginDriver to handle plugin IO
|
||||
tokio::spawn(
|
||||
PluginDriver {
|
||||
plugin: plugin.clone(),
|
||||
rpcmethods,
|
||||
}
|
||||
.run(input, output),
|
||||
.run(receiver, input, output),
|
||||
);
|
||||
|
||||
Ok(plugin)
|
||||
@@ -141,9 +180,19 @@ where
|
||||
&mut self,
|
||||
_call: messages::GetManifestCall,
|
||||
) -> messages::GetManifestResponse {
|
||||
let rpcmethods: Vec<_> = self
|
||||
.rpcmethods
|
||||
.values()
|
||||
.map(|v| messages::RpcMethod {
|
||||
name: v.name.clone(),
|
||||
description: v.description.clone(),
|
||||
usage: String::new(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
messages::GetManifestResponse {
|
||||
options: self.options.clone(),
|
||||
rpcmethods: vec![],
|
||||
rpcmethods,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,6 +220,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
type Callback<S> = Box<fn(Plugin<S>, &serde_json::Value) -> Result<serde_json::Value, Error>>;
|
||||
|
||||
/// A struct collecting the metadata required to register a custom
|
||||
/// rpcmethod with the main daemon upon init. It'll get deconstructed
|
||||
/// into just the callback after the init.
|
||||
struct RpcMethod<S>
|
||||
where
|
||||
S: Clone + Send,
|
||||
{
|
||||
callback: Callback<S>,
|
||||
description: String,
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Plugin<S>
|
||||
where
|
||||
@@ -182,6 +245,8 @@ where
|
||||
|
||||
/// A signal that allows us to wait on the plugin's shutdown.
|
||||
wait_handle: tokio::sync::broadcast::Sender<()>,
|
||||
|
||||
sender: tokio::sync::mpsc::Sender<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// The [PluginDriver] is used to run the IO loop, reading messages
|
||||
@@ -195,9 +260,10 @@ where
|
||||
{
|
||||
#[allow(dead_code)]
|
||||
plugin: Plugin<S>,
|
||||
rpcmethods: HashMap<String, Callback<S>>,
|
||||
}
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
impl<S> PluginDriver<S>
|
||||
where
|
||||
S: Send + Clone,
|
||||
@@ -205,16 +271,18 @@ where
|
||||
/// Run the plugin until we get a shutdown command.
|
||||
async fn run<I, O>(
|
||||
self,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
|
||||
mut input: FramedRead<I, JsonRpcCodec>,
|
||||
_output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
|
||||
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
I: Send + AsyncReadExt + Unpin,
|
||||
O: Send,
|
||||
O: Send + AsyncWriteExt + Unpin,
|
||||
{
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = PluginDriver::dispatch_one(&mut input, &self.plugin) => {},
|
||||
_ = self.dispatch_one(&mut input, &self.plugin) => {},
|
||||
v = receiver.recv() => {output.lock().await.send(v.unwrap()).await?},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -222,6 +290,7 @@ where
|
||||
/// Dispatch one server-side event and then return. Just so we
|
||||
/// have a nicer looking `select` statement in `run` :-)
|
||||
async fn dispatch_one<I>(
|
||||
&self,
|
||||
input: &mut FramedRead<I, JsonRpcCodec>,
|
||||
plugin: &Plugin<S>,
|
||||
) -> Result<(), Error>
|
||||
@@ -238,6 +307,31 @@ where
|
||||
messages::JsonRpc::Notification(n) => {
|
||||
PluginDriver::<S>::dispatch_notification(n, plugin).await
|
||||
}
|
||||
messages::JsonRpc::CustomRequest(id, p) => {
|
||||
match self.dispatch_custom_request(id, p, plugin).await {
|
||||
Ok(v) => plugin
|
||||
.sender
|
||||
.send(json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": id,
|
||||
"result": v
|
||||
}))
|
||||
.await
|
||||
.context("returning custom result"),
|
||||
Err(e) => plugin
|
||||
.sender
|
||||
.send(json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": id,
|
||||
"error": e.to_string(),
|
||||
}))
|
||||
.await
|
||||
.context("returning custom error"),
|
||||
}
|
||||
}
|
||||
messages::JsonRpc::CustomNotification(n) => {
|
||||
PluginDriver::<S>::dispatch_custom_notification(n, plugin).await
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
|
||||
@@ -263,6 +357,44 @@ where
|
||||
trace!("Dispatching notification {:?}", notification);
|
||||
unimplemented!()
|
||||
}
|
||||
async fn dispatch_custom_request(
|
||||
&self,
|
||||
_id: usize,
|
||||
request: serde_json::Value,
|
||||
plugin: &Plugin<S>,
|
||||
) -> Result<serde_json::Value, Error> {
|
||||
let method = request
|
||||
.get("method")
|
||||
.context("Missing 'method' in request")?
|
||||
.as_str()
|
||||
.context("'method' is not a string")?;
|
||||
|
||||
let params = request
|
||||
.get("params")
|
||||
.context("Missing 'params' field in request")?;
|
||||
let callback = self
|
||||
.rpcmethods
|
||||
.get(method)
|
||||
.with_context(|| anyhow!("No handler for method '{}' registered", method))?;
|
||||
|
||||
trace!(
|
||||
"Dispatching custom request: method={}, params={}",
|
||||
method,
|
||||
params
|
||||
);
|
||||
callback(plugin.clone(), params)
|
||||
}
|
||||
|
||||
async fn dispatch_custom_notification(
|
||||
notification: serde_json::Value,
|
||||
_plugin: &Plugin<S>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Send + Clone,
|
||||
{
|
||||
trace!("Dispatching notification {:?}", notification);
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Plugin<S>
|
||||
|
||||
Reference in New Issue
Block a user