From f77e7898b6a9cdddca193df562b5d01b312b431c Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sun, 10 Aug 2025 16:28:21 -0700 Subject: [PATCH] clndash: invoice loading Signed-off-by: William Casarin --- Cargo.lock | 7 +- Cargo.toml | 2 +- crates/notedeck_clndash/Cargo.toml | 5 +- crates/notedeck_clndash/src/event.rs | 71 +++++++ crates/notedeck_clndash/src/lib.rs | 288 +++++++++++++++++++-------- crates/notedeck_clndash/src/watch.rs | 198 ++++++++++++++++++ 6 files changed, 483 insertions(+), 88 deletions(-) create mode 100644 crates/notedeck_clndash/src/event.rs create mode 100644 crates/notedeck_clndash/src/watch.rs diff --git a/Cargo.lock b/Cargo.lock index 21d0db9..7727a2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3035,6 +3035,7 @@ dependencies = [ "bech32", "bitcoin", "lightning-types", + "serde", ] [[package]] @@ -3072,9 +3073,9 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" [[package]] name = "lnsocket" -version = "0.4.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a373bcde8b65d6db11a0cd0f70dd4a24af854dd7a112b0a51258593c65f48ff" +checksum = "724c7fba2188a49ab31316e52dd410d4d3168b8e6482aa2ac3889dd840d28712" dependencies = [ "bitcoin", "hashbrown 0.13.2", @@ -3588,6 +3589,8 @@ version = "0.6.0" dependencies = [ "eframe", "egui", + "egui_extras", + "lightning-invoice", "lnsocket", "notedeck", "serde", diff --git a/Cargo.toml b/Cargo.toml index cb7abc8..c63a11b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ mime_guess = "2.0.5" pretty_assertions = "1.4.1" jni = "0.21.1" profiling = "1.0" -lightning-invoice = "0.33.1" +lightning-invoice = { version = "0.33.1", features = ["serde"] } secp256k1 = "0.30.0" hashbrown = "0.15.2" openai-api-rs = "6.0.3" diff --git a/crates/notedeck_clndash/Cargo.toml b/crates/notedeck_clndash/Cargo.toml index 8ecfd2a..ad09732 100644 --- a/crates/notedeck_clndash/Cargo.toml +++ b/crates/notedeck_clndash/Cargo.toml @@ -8,8 +8,11 @@ egui = { workspace = true } notedeck = { workspace = true } #notedeck_ui = { workspace = true } eframe = { workspace = true } -lnsocket = "0.4.0" tracing = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } serde = { workspace = true } +egui_extras = { workspace = true } +lightning-invoice = { workspace = true } + +lnsocket = "0.5.1" diff --git a/crates/notedeck_clndash/src/event.rs b/crates/notedeck_clndash/src/event.rs new file mode 100644 index 0000000..7743c76 --- /dev/null +++ b/crates/notedeck_clndash/src/event.rs @@ -0,0 +1,71 @@ +use lightning_invoice::Bolt11Invoice; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Serialize, Debug, Clone)] +pub struct WaitRequest { + pub indexname: String, + pub subsystem: String, + pub nextvalue: u64, +} + +#[derive(Clone, Debug)] +pub enum Request { + GetInfo, + ListPeerChannels, + PaidInvoices(u32), +} + +#[derive(Deserialize, Serialize)] +pub struct ListPeerChannel { + pub short_channel_id: String, + pub our_reserve_msat: i64, + pub to_us_msat: i64, + pub total_msat: i64, + pub their_reserve_msat: i64, +} + +pub struct Channel { + pub to_us: i64, + pub to_them: i64, + pub original: ListPeerChannel, +} + +pub struct Channels { + pub max_total_msat: i64, + pub avail_in: i64, + pub avail_out: i64, + pub channels: Vec, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct Invoice { + pub lastpay_index: Option, + pub label: String, + pub bolt11: Bolt11Invoice, + pub payment_hash: String, + pub amount_msat: u64, + pub status: String, + pub description: String, + pub expires_at: u64, + pub created_index: u64, + pub updated_index: u64, +} + +/// Responses from the socket +pub enum ClnResponse { + GetInfo(Value), + ListPeerChannels(Result), + PaidInvoices(Result, lnsocket::Error>), +} + +pub enum Event { + /// We lost the socket somehow + Ended { + reason: String, + }, + + Connected, + + Response(ClnResponse), +} diff --git a/crates/notedeck_clndash/src/lib.rs b/crates/notedeck_clndash/src/lib.rs index d6bced0..e7be479 100644 --- a/crates/notedeck_clndash/src/lib.rs +++ b/crates/notedeck_clndash/src/lib.rs @@ -1,32 +1,72 @@ +use crate::event::Channel; +use crate::event::Channels; +use crate::event::ClnResponse; +use crate::event::Event; +use crate::event::Invoice; +use crate::event::ListPeerChannel; +use crate::event::Request; +use crate::watch::fetch_paid_invoices; + use egui::{Color32, Label, RichText}; use lnsocket::bitcoin::secp256k1::{PublicKey, SecretKey, rand}; use lnsocket::{CommandoClient, LNSocket}; use notedeck::{AppAction, AppContext}; -use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; +use serde_json::json; use std::str::FromStr; +use std::sync::Arc; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; -struct Channel { - to_us: i64, - to_them: i64, - original: ListPeerChannel, +mod event; +mod watch; + +pub enum LoadingState { + Loading, + Failed(E), + Loaded(T), } -struct Channels { - max_total_msat: i64, - avail_in: i64, - avail_out: i64, - channels: Vec, +impl Default for LoadingState { + fn default() -> Self { + Self::Loading + } +} + +impl LoadingState { + fn _as_ref(&self) -> LoadingState<&T, &E> { + match self { + Self::Loading => LoadingState::<&T, &E>::Loading, + Self::Failed(err) => LoadingState::<&T, &E>::Failed(err), + Self::Loaded(t) => LoadingState::<&T, &E>::Loaded(t), + } + } + + fn from_result(res: Result) -> LoadingState { + match res { + Ok(r) => LoadingState::Loaded(r), + Err(err) => LoadingState::Failed(err), + } + } + + /* + fn unwrap(self) -> T { + let Self::Loaded(t) = self else { + panic!("unwrap in LoadingState"); + }; + + t + } + */ } #[derive(Default)] pub struct ClnDash { initialized: bool, connection_state: ConnectionState, - get_info: Option, - channels: Option>, + summary: LoadingState, + get_info: LoadingState, + channels: LoadingState, channel: Option, + invoices: LoadingState, lnsocket::Error>, last_summary: Option, } @@ -41,44 +81,12 @@ struct CommChannel { event_rx: UnboundedReceiver, } -/// Responses from the socket -enum ClnResponse { - GetInfo(Value), - ListPeerChannels(Result), -} - -#[derive(Deserialize, Serialize)] -struct ListPeerChannel { - short_channel_id: String, - our_reserve_msat: i64, - to_us_msat: i64, - total_msat: i64, - their_reserve_msat: i64, -} - enum ConnectionState { Dead(String), Connecting, Active, } -#[derive(Eq, PartialEq, Clone, Debug)] -enum Request { - GetInfo, - ListPeerChannels, -} - -enum Event { - /// We lost the socket somehow - Ended { - reason: String, - }, - - Connected, - - Response(ClnResponse), -} - impl notedeck::App for ClnDash { fn update(&mut self, _ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> Option { if !self.initialized { @@ -116,6 +124,25 @@ fn connection_state_ui(ui: &mut egui::Ui, state: &ConnectionState) { } } +fn summary_ui( + ui: &mut egui::Ui, + last_summary: Option<&Summary>, + summary: &LoadingState, +) { + match summary { + LoadingState::Loading => { + ui.label("loading summary"); + } + LoadingState::Failed(err) => { + ui.label(format!("Failed to get summary: {err}")); + } + LoadingState::Loaded(summary) => { + summary_cards_ui(ui, summary, last_summary); + ui.add_space(8.0); + } + } +} + impl ClnDash { fn show(&mut self, ui: &mut egui::Ui) { egui::Frame::new() @@ -123,16 +150,10 @@ impl ClnDash { .show(ui, |ui| { egui::ScrollArea::vertical().show(ui, |ui| { connection_state_ui(ui, &self.connection_state); - if let Some(Ok(ch)) = self.channels.as_ref() { - let summary = compute_summary(ch); - summary_cards_ui(ui, &summary, self.last_summary.as_ref()); - ui.add_space(8.0); - } + summary_ui(ui, self.last_summary.as_ref(), &self.summary); + invoices_ui(ui, &self.invoices); channels_ui(ui, &self.channels); - - if let Some(info) = self.get_info.as_ref() { - get_info_ui(ui, info); - } + get_info_ui(ui, &self.get_info); }); }); } @@ -167,7 +188,7 @@ impl ClnDash { let rune = std::env::var("RUNE").unwrap_or( "Vns1Zxvidr4J8pP2ZCg3Wjp2SyGyyf5RHgvFG8L36yM9MzMmbWV0aG9kPWdldGluZm8=".to_string(), ); - let commando = CommandoClient::spawn(lnsocket, &rune); + let commando = Arc::new(CommandoClient::spawn(lnsocket, &rune)); loop { match req_rx.recv().await { @@ -181,25 +202,47 @@ impl ClnDash { Some(req) => { tracing::debug!("calling {req:?}"); match req { - Request::GetInfo => match commando.call("getinfo", json!({})).await { - Ok(v) => { - let _ = event_tx.send(Event::Response(ClnResponse::GetInfo(v))); - } - Err(err) => { - tracing::error!("get_info error {}", err); - } - }, + Request::GetInfo => { + let event_tx = event_tx.clone(); + let commando = commando.clone(); + tokio::spawn(async move { + match commando.call("getinfo", json!({})).await { + Ok(v) => { + let _ = event_tx + .send(Event::Response(ClnResponse::GetInfo(v))); + } + Err(err) => { + tracing::error!("get_info error {}", err); + } + } + }); + } + + Request::PaidInvoices(n) => { + let event_tx = event_tx.clone(); + let commando = commando.clone(); + tokio::spawn(async move { + let invoices = fetch_paid_invoices(commando, n).await; + let _ = event_tx + .send(Event::Response(ClnResponse::PaidInvoices(invoices))); + }); + } Request::ListPeerChannels => { - let peer_channels = - commando.call("listpeerchannels", json!({})).await; - let channels = peer_channels.map(|v| { - let peer_channels: Vec = - serde_json::from_value(v["channels"].clone()).unwrap(); - to_channels(peer_channels) + let event_tx = event_tx.clone(); + let commando = commando.clone(); + tokio::spawn(async move { + let peer_channels = + commando.call("listpeerchannels", json!({})).await; + let channels = peer_channels.map(|v| { + let peer_channels: Vec = + serde_json::from_value(v["channels"].clone()).unwrap(); + to_channels(peer_channels) + }); + let _ = event_tx.send(Event::Response( + ClnResponse::ListPeerChannels(channels), + )); }); - let _ = event_tx - .send(Event::Response(ClnResponse::ListPeerChannels(channels))); } } } @@ -223,20 +266,30 @@ impl ClnDash { self.connection_state = ConnectionState::Active; let _ = channel.req_tx.send(Request::GetInfo); let _ = channel.req_tx.send(Request::ListPeerChannels); + let _ = channel.req_tx.send(Request::PaidInvoices(30)); } Event::Response(resp) => match resp { ClnResponse::ListPeerChannels(chans) => { - if let Some(Ok(prev)) = self.channels.as_ref() { + if let LoadingState::Loaded(prev) = &self.channels { self.last_summary = Some(compute_summary(prev)); } - self.channels = Some(chans); + + self.summary = match &chans { + Ok(chans) => LoadingState::Loaded(compute_summary(chans)), + Err(err) => LoadingState::Failed(err.clone()), + }; + self.channels = LoadingState::from_result(chans); } ClnResponse::GetInfo(value) => { - if let Ok(s) = serde_json::to_string_pretty(&value) { - self.get_info = Some(s); - } + let res = serde_json::to_string_pretty(&value); + self.get_info = + LoadingState::from_result(res.map_err(|_| lnsocket::Error::Json)); + } + + ClnResponse::PaidInvoices(invoices) => { + self.invoices = LoadingState::from_result(invoices); } }, } @@ -244,9 +297,15 @@ impl ClnDash { } } -fn get_info_ui(ui: &mut egui::Ui, info: &str) { - ui.horizontal_wrapped(|ui| { - ui.add(Label::new(info).wrap_mode(egui::TextWrapMode::Wrap)); +fn get_info_ui(ui: &mut egui::Ui, info: &LoadingState) { + ui.horizontal_wrapped(|ui| match info { + LoadingState::Loading => {} + LoadingState::Failed(err) => { + ui.label(format!("failed to fetch node info: {err}")); + } + LoadingState::Loaded(info) => { + ui.add(Label::new(info).wrap_mode(egui::TextWrapMode::Wrap)); + } }); } @@ -333,9 +392,25 @@ fn human_sat(msat: i64) -> String { } } -fn channels_ui(ui: &mut egui::Ui, channels: &Option>) { +fn human_verbose_sat(msat: i64) -> String { + if msat < 1_000 { + // less than 1 sat + format!("{msat} msat") + } else { + let sats = msat / 1_000; + if sats < 100_000_000 { + // less than 1 BTC + format!("{sats} sat") + } else { + let btc = sats / 100_000_000; + format!("{btc} BTC") + } + } +} + +fn channels_ui(ui: &mut egui::Ui, channels: &LoadingState) { match channels { - Some(Ok(channels)) => { + LoadingState::Loaded(channels) => { if channels.channels.is_empty() { ui.label("no channels yet..."); return; @@ -348,11 +423,11 @@ fn channels_ui(ui: &mut egui::Ui, channels: &Option { + LoadingState::Failed(err) => { ui.label(format!("error fetching channels: {err}")); } - None => { - ui.label("no channels yet..."); + LoadingState::Loading => { + ui.label("fetching channels..."); } } } @@ -524,3 +599,48 @@ fn delta_str(new: i64, old: i64) -> String { std::cmp::Ordering::Equal => "·".into(), } } + +fn invoices_ui(ui: &mut egui::Ui, invoices: &LoadingState, lnsocket::Error>) { + match invoices { + LoadingState::Loading => { + ui.label("loading invoices..."); + } + + LoadingState::Failed(err) => { + ui.label(format!("failed to load invoices: {err}")); + } + + LoadingState::Loaded(invoices) => { + use egui_extras::{Column, TableBuilder}; + + TableBuilder::new(ui) + .column(Column::auto().resizable(true)) + .column(Column::remainder()) + .header(20.0, |mut header| { + header.col(|ui| { + ui.heading("Description"); + }); + header.col(|ui| { + ui.heading("Amount"); + }); + }) + .body(|mut body| { + for invoice in invoices { + body.row(30.0, |mut row| { + row.col(|ui| { + ui.label(invoice.description.clone()); + }); + row.col(|ui| match invoice.bolt11.amount_milli_satoshis() { + None => { + ui.label("any"); + } + Some(amt) => { + ui.label(human_verbose_sat(amt as i64)); + } + }); + }); + } + }); + } + } +} diff --git a/crates/notedeck_clndash/src/watch.rs b/crates/notedeck_clndash/src/watch.rs new file mode 100644 index 0000000..a674eef --- /dev/null +++ b/crates/notedeck_clndash/src/watch.rs @@ -0,0 +1,198 @@ +use crate::event::Invoice; +use lnsocket::CallOpts; +use lnsocket::CommandoClient; +use serde::Deserialize; +use serde_json::json; +use std::sync::Arc; + +#[derive(Deserialize)] +struct UpdatedInvoicesResponse { + updated: u64, +} + +#[derive(Deserialize)] +struct PayIndexInvoices { + invoices: Vec, +} + +#[derive(Deserialize)] +struct PayIndexScan { + pay_index: Option, +} + +async fn find_lastpay_index(commando: Arc) -> Result, lnsocket::Error> { + const PAGE: u64 = 250; + // 1) get the current updated tail + let created_value = commando + .call( + "wait", + json!({"subsystem":"invoices","indexname":"updated","nextvalue":0}), + ) + .await?; + let response: UpdatedInvoicesResponse = + serde_json::from_value(created_value).map_err(|_| lnsocket::Error::Json)?; + + // start our window at the tail + let mut start_at = response + .updated + .saturating_add(1) // +1 because we want max(1, updated - PAGE + 1) + .saturating_sub(PAGE) + .max(1); + + loop { + // 2) fetch a window (indexed by "updated") + let val = commando + .call_with_opts( + "listinvoices", + json!({ + "index": "updated", + "start": start_at, + "limit": PAGE, + }), + // only fetch the one field we care about + CallOpts::default().filter(json!({ + "invoices": [{"pay_index": true}] + })), + ) + .await?; + + let parsed: PayIndexInvoices = + serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?; + + if let Some(pi) = parsed.invoices.iter().filter_map(|inv| inv.pay_index).max() { + return Ok(Some(pi)); + } + + // 4) no paid invoice in this slice—step back or bail + if start_at == 1 { + return Ok(None); + } + + start_at = start_at.saturating_sub(PAGE).max(1); + } +} + +pub async fn fetch_paid_invoices( + commando: Arc, + limit: u32, +) -> Result, lnsocket::Error> { + use tokio::task::JoinSet; + + // look for an invoice with the last paid index + let Some(lastpay_index) = find_lastpay_index(commando.clone()).await? else { + // no paid invoices + return Ok(vec![]); + }; + + let mut set: JoinSet> = JoinSet::new(); + let start = lastpay_index.saturating_sub(limit as u64); + + // 3) Fire off at most `concurrency` `waitanyinvoice` calls at a time, + // collect all successful responses into a Vec. + // fire them ALL at once + for idx in start..lastpay_index { + let c = commando.clone(); + set.spawn(async move { + let val = c + .call( + "waitanyinvoice", + serde_json::json!({ "lastpay_index": idx }), + ) + .await?; + let parsed: Invoice = serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?; + Ok(parsed) + }); + } + + let mut results = Vec::with_capacity(limit as usize); + while let Some(res) = set.join_next().await { + results.push(res.map_err(|_| lnsocket::Error::Io(std::io::ErrorKind::Interrupted))??); + } + + results.sort_by(|a, b| a.updated_index.cmp(&b.updated_index)); + + Ok(results) +} + +// wip watch subsystem +/* +async fn watch_subsystem( + commando: CommandoClient, + subsystem: WaitSubsystem, + index: WaitIndex, + event_tx: UnboundedSender, + mut cancel_rx: Receiver<()>, +) { + // Step 1: Fetch current index value so we can back up ~20 + let mut nextvalue: u64 = match commando + .call( + "wait", + serde_json::json!({ + "indexname": index.as_str(), + "subsystem": subsystem.as_str(), + "nextvalue": 0 + }), + ) + .await + { + Ok(v) => { + // You showed the result has `updated` as the current highest index + let current = v.get("updated").and_then(|x| x.as_u64()).unwrap_or(0); + current.saturating_sub(20) // back up 20, clamp at 0 + } + Err(err) => { + tracing::warn!("initial wait(…nextvalue=0) failed: {}", err); + 0 + } + }; + + loop { + // You can add a timeout to avoid hanging forever in weird network states. + let fut = commando.call( + "wait", + serde_json::to_value(WaitRequest { + indexname: "invoices".into(), + subsystem: "lightningd".into(), + nextvalue, + }) + .unwrap(), + ); + + tokio::select! { + _ = &mut cancel_rx => { + // graceful shutdown + break; + } + + res = fut => { + match res { + Ok(v) => { + // Typical shape: { "nextvalue": n, "invoicestatus": { ... } } (varies by plugin/index) + // Adjust these lookups for your node’s actual wait payload. + if let Some(nv) = v.get("nextvalue").and_then(|x| x.as_u64()) { + nextvalue = nv + 1; + } else { + // Defensive: never get stuck — bump at least by 1 + nextvalue += 1; + } + + // Inspect/route + let kind = v.get("status").and_then(|s| s.as_str()); + let ev = match kind { + Some("paid") => ClnResponse::Invoice(InvoiceEvent::Paid(v.clone())), + Some("created") => ClnResponse::Invoice(InvoiceEvent::Created(v.clone())), + _ => ClnResponse::Invoice(InvoiceEvent::Other(v.clone())), + }; + let _ = event_tx.send(Event::Response(ev)); + } + Err(err) => { + tracing::warn!("wait(invoices) error: {err}"); + // small backoff so we don't tight-loop on persistent errors + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + } + } + } + } +} +*/