diff --git a/Cargo.toml b/Cargo.toml index 288c89347..b597a2cef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,4 +2,5 @@ members = [ "cln-rpc", "cln-grpc", + "plugins", ] diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml new file mode 100644 index 000000000..49fbc6db0 --- /dev/null +++ b/plugins/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "cln-plugin" +version = "0.1.0" +edition = "2021" + +[[example]] +name = "cln-plugin-startup" +path = "examples/cln-plugin-startup.rs" + +[dependencies] +anyhow = "1.0.51" +bytes = "1.1.0" +log = "0.4.14" +serde = { version = "1.0.131", features = ["derive"] } +serde_json = "1.0.72" +tokio-util = { version = "0.6.9", features = ["codec"] } +tokio = { version="1", features = ['io-std', 'rt'] } +tokio-stream = "*" +futures = "0.3" +cln-rpc = { path = "../cln-rpc" } + +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread", ] } +env_logger = "*" +cln-grpc = { path = "../cln-grpc" } diff --git a/plugins/Makefile b/plugins/Makefile index 3707e7688..70302311b 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -173,4 +173,14 @@ ALL_C_HEADERS += plugins/list_of_builtin_plugins_gen.h plugins/list_of_builtin_plugins_gen.h: plugins/Makefile Makefile @$(call VERBOSE,GEN $@,echo "static const char *list_of_builtin_plugins[] = { $(foreach d,$(notdir $(PLUGINS)),\"$d\",) NULL };" > $@) +CLN_PLUGIN_EXAMPLES := target/${RUST_PROFILE}/examples/cln-plugin-startup +CLN_PLUGIN_SRC = $(shell find plugins/src -name "*.rs") + +${CLN_PLUGIN_EXAMPLES}: ${CLN_PLUGIN_SRC} + (cd plugins; cargo build ${CARGO_OPTS} --examples) + +ifneq ($(RUST),0) +DEFAULT_TARGETS += $(CLN_PLUGIN_EXAMPLES) +endif + include plugins/test/Makefile diff --git a/plugins/examples/cln-plugin-startup.rs b/plugins/examples/cln-plugin-startup.rs new file mode 100644 index 000000000..f1c6ed26b --- /dev/null +++ b/plugins/examples/cln-plugin-startup.rs @@ -0,0 +1,14 @@ +//! This is a test plugin used to verify that we can compile and run +//! plugins using the Rust API against c-lightning. + +use cln_plugin::Builder; +use tokio; + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + env_logger::init(); + + let (plugin, stdin) = Builder::new((), tokio::io::stdin(), tokio::io::stdout()).build(); + plugin.run(stdin).await; + Ok(()) +} diff --git a/plugins/src/codec.rs b/plugins/src/codec.rs new file mode 100644 index 000000000..e3d1a5fef --- /dev/null +++ b/plugins/src/codec.rs @@ -0,0 +1,207 @@ +/// The codec is used to encode and decode messages received from and +/// sent to the main daemon. The protocol uses `stdout` and `stdin` to +/// exchange JSON formatted messages. Each message is separated by an +/// empty line and we're guaranteed that no other empty line is +/// present in the messages. +use crate::Error; +use anyhow::anyhow; +use bytes::{BufMut, BytesMut}; +use serde_json::value::Value; +use std::str::FromStr; +use std::{io, str}; +use tokio_util::codec::{Decoder, Encoder}; + +use crate::messages::{Notification, Request}; +pub use crate::messages::JsonRpc; + +/// A simple codec that parses messages separated by two successive +/// `\n` newlines. +#[derive(Default)] +pub struct MultiLineCodec {} + +/// Find two consecutive newlines, i.e., an empty line, signalling the +/// end of one message and the start of the next message. +fn find_separator(buf: &mut BytesMut) -> Option { + buf.iter() + .zip(buf.iter().skip(1)) + .position(|b| *b.0 == b'\n' && *b.1 == b'\n') +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8")) +} + +impl Decoder for MultiLineCodec { + type Item = String; + type Error = Error; + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + if let Some(newline_offset) = find_separator(buf) { + let line = buf.split_to(newline_offset + 2); + let line = &line[..line.len() - 2]; + let line = utf8(line)?; + Ok(Some(line.to_string())) + } else { + Ok(None) + } + } +} + +impl Encoder for MultiLineCodec +where + T: AsRef, +{ + type Error = Error; + fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), Self::Error> { + let line = line.as_ref(); + buf.reserve(line.len() + 2); + buf.put(line.as_bytes()); + buf.put_u8(b'\n'); + buf.put_u8(b'\n'); + Ok(()) + } +} + +#[derive(Default)] +pub struct JsonCodec { + /// Sub-codec used to split the input into chunks that can then be + /// parsed by the JSON parser. + inner: MultiLineCodec, +} + +impl Encoder for JsonCodec +where + T: Into, +{ + type Error = Error; + fn encode(&mut self, msg: T, buf: &mut BytesMut) -> Result<(), Self::Error> { + let s = msg.into().to_string(); + self.inner.encode(s, buf) + } +} + +impl Decoder for JsonCodec { + type Item = Value; + type Error = Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + match self.inner.decode(buf) { + Ok(None) => Ok(None), + Err(e) => Err(e), + Ok(Some(s)) => { + if let Ok(v) = Value::from_str(&s) { + Ok(Some(v)) + } else { + Err(anyhow!("failed to parse JSON")) + } + } + } + } +} + +/// A codec that reads fully formed [crate::messages::JsonRpc] +/// messages. Internally it uses the [JsonCodec] which itself is built +/// on the [MultiLineCodec]. +#[derive(Default)] +pub(crate) struct JsonRpcCodec { + inner: JsonCodec, +} + +impl Decoder for JsonRpcCodec { + type Item = JsonRpc; + type Error = Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + match self.inner.decode(buf) { + Ok(None) => Ok(None), + Err(e) => Err(e), + Ok(Some(s)) => { + let req: Self::Item = serde_json::from_value(s)?; + Ok(Some(req)) + } + } + } +} + +#[cfg(test)] +mod test { + use super::{find_separator, JsonCodec, MultiLineCodec}; + use bytes::{BufMut, BytesMut}; + use serde_json::json; + use tokio_util::codec::{Decoder, Encoder}; + + #[test] + fn test_separator() { + struct Test(String, Option); + let tests = vec![ + Test("".to_string(), None), + Test("}\n\n".to_string(), Some(1)), + Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)), + ]; + + for t in tests.iter() { + let mut buf = BytesMut::new(); + buf.put_slice(t.0.as_bytes()); + assert_eq!(find_separator(&mut buf), t.1); + } + } + + #[test] + fn test_ml_decoder() { + struct Test(String, Option, String); + let tests = vec![ + Test("".to_string(), None, "".to_string()), + Test( + "{\"hello\":\"world\"}\n\nremainder".to_string(), + Some("{\"hello\":\"world\"}".to_string()), + "remainder".to_string(), + ), + Test( + "{\"hello\":\"world\"}\n\n{}\n\nremainder".to_string(), + Some("{\"hello\":\"world\"}".to_string()), + "{}\n\nremainder".to_string(), + ), + ]; + + for t in tests.iter() { + let mut buf = BytesMut::new(); + buf.put_slice(t.0.as_bytes()); + + let mut codec = MultiLineCodec::default(); + let mut remainder = BytesMut::new(); + remainder.put_slice(t.2.as_bytes()); + + assert_eq!(codec.decode(&mut buf).unwrap(), t.1); + assert_eq!(buf, remainder); + } + } + + #[test] + fn test_ml_encoder() { + let tests = vec!["test"]; + + for t in tests.iter() { + let mut buf = BytesMut::new(); + let mut codec = MultiLineCodec::default(); + let mut expected = BytesMut::new(); + expected.put_slice(t.as_bytes()); + expected.put_u8(b'\n'); + expected.put_u8(b'\n'); + codec.encode(t, &mut buf).unwrap(); + assert_eq!(buf, expected); + } + } + + #[test] + fn test_json_codec() { + let tests = vec![json!({"hello": "world"})]; + + for t in tests.iter() { + let mut codec = JsonCodec::default(); + let mut buf = BytesMut::new(); + codec.encode(t.clone(), &mut buf).unwrap(); + let decoded = codec.decode(&mut buf).unwrap().unwrap(); + assert_eq!(&decoded, t); + } + } +} diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs new file mode 100644 index 000000000..ca43e567b --- /dev/null +++ b/plugins/src/lib.rs @@ -0,0 +1,191 @@ +use crate::codec::{JsonCodec, JsonRpcCodec}; +use futures::sink::SinkExt; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_util::codec::FramedWrite; +pub mod codec; +mod messages; +pub use anyhow::Error; +use log::{trace, warn}; +use std::marker::PhantomData; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_stream::StreamExt; +use tokio_util::codec::FramedRead; + +#[macro_use] +extern crate serde_json; + +/// Builder for a new plugin. +pub struct Builder +where + S: Clone + Send, + I: AsyncRead + Unpin, + O: Send + AsyncWrite + Unpin, +{ + state: S, + + input: I, + output: O, + + #[allow(dead_code)] + hooks: Hooks, + + #[allow(dead_code)] + subscriptions: Subscriptions, +} + +impl Builder +where + O: Send + AsyncWrite + Unpin + 'static, + S: Clone + Send + 'static, + I: AsyncRead + Send + Unpin + 'static, +{ + pub fn new(state: S, input: I, output: O) -> Self { + Self { + state, + input, + output, + hooks: Hooks::default(), + subscriptions: Subscriptions::default(), + } + } + + pub async fn run(self) -> Result<(), Error> { + let (plugin, input) = self.build(); + plugin.run(input).await + } + + pub fn build(self) -> (Plugin, I) { + ( + Plugin { + state: Arc::new(Mutex::new(self.state)), + output: Arc::new(Mutex::new(FramedWrite::new( + self.output, + JsonCodec::default(), + ))), + input_type: PhantomData, + }, + self.input, + ) + } +} + +pub struct Plugin +where + S: Clone + Send, + I: AsyncRead, + O: Send + AsyncWrite, +{ + //input: FramedRead, + output: Arc>>, + + /// The state gets cloned for each request + state: Arc>, + input_type: PhantomData, +} +impl Plugin +where + S: Clone + Send, + I: AsyncRead + Send + Unpin, + O: Send + AsyncWrite + Unpin, +{ + /// Read incoming requests from `c-lightning and dispatch their handling. + #[allow(unused_mut)] + pub async fn run(mut self, input: I) -> Result<(), Error> { + let mut input = FramedRead::new(input, JsonRpcCodec::default()); + loop { + match input.next().await { + Some(Ok(msg)) => { + trace!("Received a message: {:?}", msg); + match msg { + messages::JsonRpc::Request(id, p) => { + self.dispatch_request(id, p).await? + // Use a match to detect Ok / Error and return an error if we failed. + } + messages::JsonRpc::Notification(n) => self.dispatch_notification(n).await?, + } + } + Some(Err(e)) => { + warn!("Error reading command: {}", e); + break; + } + None => break, + } + } + Ok(()) + } + + async fn dispatch_request( + &mut self, + id: usize, + request: messages::Request, + ) -> Result<(), Error> { + trace!("Dispatching request {:?}", request); + let state = self.state.clone(); + let res: serde_json::Value = match request { + messages::Request::Getmanifest(c) => { + serde_json::to_value(Plugin::::handle_get_manifest(c, state).await?) + .unwrap() + } + messages::Request::Init(c) => { + serde_json::to_value(Plugin::::handle_init(c, state).await?).unwrap() + } + o => panic!("Request {:?} is currently unhandled", o), + }; + trace!("Sending respone {:?}", res); + + let mut out = self.output.lock().await; + out.send(json!({ + "jsonrpc": "2.0", + "result": res, + "id": id, + })) + .await + .unwrap(); + Ok(()) + } + + async fn dispatch_notification( + &mut self, + notification: messages::Notification, + ) -> Result<(), Error> { + trace!("Dispatching notification {:?}", notification); + unimplemented!() + } + + async fn handle_get_manifest( + _call: messages::GetManifestCall, + _state: Arc>, + ) -> Result { + Ok(messages::GetManifestResponse::default()) + } + + async fn handle_init( + _call: messages::InitCall, + _state: Arc>, + ) -> Result { + Ok(messages::InitResponse::default()) + } +} + +/// A container for all the configure hooks. It is just a collection +/// of callbacks that can be registered by the users of the +/// library. Based on this configuration we can then generate the +/// [`messages::GetManifestResponse`] from, populating our subscriptions +#[derive(Debug, Default)] +struct Hooks {} + +/// A container for all the configured notifications. +#[derive(Debug, Default)] +struct Subscriptions {} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn init() { + let builder = Builder::new((), tokio::io::stdin(), tokio::io::stdout()); + let plugin = builder.build(); + } +} diff --git a/plugins/src/messages.rs b/plugins/src/messages.rs new file mode 100644 index 000000000..fcba6ceae --- /dev/null +++ b/plugins/src/messages.rs @@ -0,0 +1,160 @@ +use serde::de::{self, Deserializer}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use std::fmt::Debug; + +#[derive(Deserialize, Debug)] +#[serde(tag = "method", content = "params")] +#[serde(rename_all = "snake_case")] +pub(crate) enum Request { + // Builtin + Getmanifest(GetManifestCall), + Init(InitCall), + + // Hooks + PeerConnected, + CommitmentRevocation, + DbWrite, + InvoicePayment, + Openchannel, + Openchannel2, + Openchannel2Changed, + Openchannel2Sign, + RbfChannel, + HtlcAccepted, + RpcCommand, + Custommsg, + OnionMessage, + OnionMessageBlinded, + OnionMessageOurpath, + + // Bitcoin backend + Getchaininfo, + Estimatefees, + Getrawblockbyheight, + Getutxout, + Sendrawtransaction, +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "method", content = "params")] +#[serde(rename_all = "snake_case")] +pub(crate) enum Notification { + ChannelOpened, + ChannelOpenFailed, + ChannelStateChanged, + Connect, + Disconnect, + InvoicePayment, + InvoiceCreation, + Warning, + ForwardEvent, + SendpaySuccess, + SendpayFailure, + CoinMovement, + OpenchannelPeerSigs, + Shutdown, +} + +#[derive(Deserialize, Debug)] +pub struct GetManifestCall {} + +#[derive(Deserialize, Debug)] +pub struct InitCall { + pub options: Value, + pub configuration: HashMap, +} + +#[derive(Debug)] +pub enum JsonRpc { + Request(usize, R), + Notification(N), +} + +/// This function disentangles the various cases: +/// +/// 1) If we have an `id` then it is a request +/// +/// 2) Otherwise it's a notification that doesn't require a +/// response. +/// +/// Furthermore we distinguish between the built-in types and the +/// custom user notifications/methods: +/// +/// 1) We either match a built-in type above, +/// +/// 2) Or it's a custom one, so we pass it around just as a +/// `serde_json::Value` +impl<'de, N, R> Deserialize<'de> for JsonRpc +where + N: Deserialize<'de> + Debug, + R: Deserialize<'de> + Debug, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize, Debug)] + struct IdHelper { + id: Option, + } + + let v = Value::deserialize(deserializer)?; + let helper = IdHelper::deserialize(&v).map_err(de::Error::custom)?; + match helper.id { + Some(id) => { + let r = R::deserialize(v).map_err(de::Error::custom)?; + Ok(JsonRpc::Request(id, r)) + } + None => { + let n = N::deserialize(v).map_err(de::Error::custom)?; + Ok(JsonRpc::Notification(n)) + } + } + } +} + +use serde::ser::{SerializeStruct, Serializer}; + +impl Serialize for JsonRpc +where + N: Serialize + Debug, + R: Serialize + Debug, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + JsonRpc::Notification(r) => { + let r = serde_json::to_value(r).unwrap(); + let mut s = serializer.serialize_struct("Notification", 3)?; + s.serialize_field("jsonrpc", "2.0")?; + s.serialize_field("method", &r["method"])?; + s.serialize_field("params", &r["params"])?; + s.end() + } + JsonRpc::Request(id, r) => { + let r = serde_json::to_value(r).unwrap(); + let mut s = serializer.serialize_struct("Request", 4)?; + s.serialize_field("jsonrpc", "2.0")?; + s.serialize_field("id", id)?; + s.serialize_field("method", &r["method"])?; + s.serialize_field("params", &r["params"])?; + s.end() + } + } + } +} + +#[derive(Serialize, Default, Debug)] +pub struct GetManifestResponse { + options: Vec<()>, + rpcmethods: Vec<()>, +} + +#[derive(Serialize, Default, Debug)] +pub struct InitResponse {} + +pub trait Response: Serialize + Debug {}