clndash: invoice loading

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-08-10 16:28:21 -07:00
parent 8138a0a1ca
commit f77e7898b6
6 changed files with 483 additions and 88 deletions

View File

@@ -8,8 +8,11 @@ egui = { workspace = true }
notedeck = { workspace = true }
#notedeck_ui = { workspace = true }
eframe = { workspace = true }
lnsocket = "0.4.0"
tracing = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
egui_extras = { workspace = true }
lightning-invoice = { workspace = true }
lnsocket = "0.5.1"

View File

@@ -0,0 +1,71 @@
use lightning_invoice::Bolt11Invoice;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Serialize, Debug, Clone)]
pub struct WaitRequest {
pub indexname: String,
pub subsystem: String,
pub nextvalue: u64,
}
#[derive(Clone, Debug)]
pub enum Request {
GetInfo,
ListPeerChannels,
PaidInvoices(u32),
}
#[derive(Deserialize, Serialize)]
pub struct ListPeerChannel {
pub short_channel_id: String,
pub our_reserve_msat: i64,
pub to_us_msat: i64,
pub total_msat: i64,
pub their_reserve_msat: i64,
}
pub struct Channel {
pub to_us: i64,
pub to_them: i64,
pub original: ListPeerChannel,
}
pub struct Channels {
pub max_total_msat: i64,
pub avail_in: i64,
pub avail_out: i64,
pub channels: Vec<Channel>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Invoice {
pub lastpay_index: Option<u64>,
pub label: String,
pub bolt11: Bolt11Invoice,
pub payment_hash: String,
pub amount_msat: u64,
pub status: String,
pub description: String,
pub expires_at: u64,
pub created_index: u64,
pub updated_index: u64,
}
/// Responses from the socket
pub enum ClnResponse {
GetInfo(Value),
ListPeerChannels(Result<Channels, lnsocket::Error>),
PaidInvoices(Result<Vec<Invoice>, lnsocket::Error>),
}
pub enum Event {
/// We lost the socket somehow
Ended {
reason: String,
},
Connected,
Response(ClnResponse),
}

View File

@@ -1,32 +1,72 @@
use crate::event::Channel;
use crate::event::Channels;
use crate::event::ClnResponse;
use crate::event::Event;
use crate::event::Invoice;
use crate::event::ListPeerChannel;
use crate::event::Request;
use crate::watch::fetch_paid_invoices;
use egui::{Color32, Label, RichText};
use lnsocket::bitcoin::secp256k1::{PublicKey, SecretKey, rand};
use lnsocket::{CommandoClient, LNSocket};
use notedeck::{AppAction, AppContext};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use serde_json::json;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
struct Channel {
to_us: i64,
to_them: i64,
original: ListPeerChannel,
mod event;
mod watch;
pub enum LoadingState<T, E> {
Loading,
Failed(E),
Loaded(T),
}
struct Channels {
max_total_msat: i64,
avail_in: i64,
avail_out: i64,
channels: Vec<Channel>,
impl<T, E> Default for LoadingState<T, E> {
fn default() -> Self {
Self::Loading
}
}
impl<T, E> LoadingState<T, E> {
fn _as_ref(&self) -> LoadingState<&T, &E> {
match self {
Self::Loading => LoadingState::<&T, &E>::Loading,
Self::Failed(err) => LoadingState::<&T, &E>::Failed(err),
Self::Loaded(t) => LoadingState::<&T, &E>::Loaded(t),
}
}
fn from_result(res: Result<T, E>) -> LoadingState<T, E> {
match res {
Ok(r) => LoadingState::Loaded(r),
Err(err) => LoadingState::Failed(err),
}
}
/*
fn unwrap(self) -> T {
let Self::Loaded(t) = self else {
panic!("unwrap in LoadingState");
};
t
}
*/
}
#[derive(Default)]
pub struct ClnDash {
initialized: bool,
connection_state: ConnectionState,
get_info: Option<String>,
channels: Option<Result<Channels, lnsocket::Error>>,
summary: LoadingState<Summary, lnsocket::Error>,
get_info: LoadingState<String, lnsocket::Error>,
channels: LoadingState<Channels, lnsocket::Error>,
channel: Option<CommChannel>,
invoices: LoadingState<Vec<Invoice>, lnsocket::Error>,
last_summary: Option<Summary>,
}
@@ -41,44 +81,12 @@ struct CommChannel {
event_rx: UnboundedReceiver<Event>,
}
/// Responses from the socket
enum ClnResponse {
GetInfo(Value),
ListPeerChannels(Result<Channels, lnsocket::Error>),
}
#[derive(Deserialize, Serialize)]
struct ListPeerChannel {
short_channel_id: String,
our_reserve_msat: i64,
to_us_msat: i64,
total_msat: i64,
their_reserve_msat: i64,
}
enum ConnectionState {
Dead(String),
Connecting,
Active,
}
#[derive(Eq, PartialEq, Clone, Debug)]
enum Request {
GetInfo,
ListPeerChannels,
}
enum Event {
/// We lost the socket somehow
Ended {
reason: String,
},
Connected,
Response(ClnResponse),
}
impl notedeck::App for ClnDash {
fn update(&mut self, _ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> Option<AppAction> {
if !self.initialized {
@@ -116,6 +124,25 @@ fn connection_state_ui(ui: &mut egui::Ui, state: &ConnectionState) {
}
}
fn summary_ui(
ui: &mut egui::Ui,
last_summary: Option<&Summary>,
summary: &LoadingState<Summary, lnsocket::Error>,
) {
match summary {
LoadingState::Loading => {
ui.label("loading summary");
}
LoadingState::Failed(err) => {
ui.label(format!("Failed to get summary: {err}"));
}
LoadingState::Loaded(summary) => {
summary_cards_ui(ui, summary, last_summary);
ui.add_space(8.0);
}
}
}
impl ClnDash {
fn show(&mut self, ui: &mut egui::Ui) {
egui::Frame::new()
@@ -123,16 +150,10 @@ impl ClnDash {
.show(ui, |ui| {
egui::ScrollArea::vertical().show(ui, |ui| {
connection_state_ui(ui, &self.connection_state);
if let Some(Ok(ch)) = self.channels.as_ref() {
let summary = compute_summary(ch);
summary_cards_ui(ui, &summary, self.last_summary.as_ref());
ui.add_space(8.0);
}
summary_ui(ui, self.last_summary.as_ref(), &self.summary);
invoices_ui(ui, &self.invoices);
channels_ui(ui, &self.channels);
if let Some(info) = self.get_info.as_ref() {
get_info_ui(ui, info);
}
get_info_ui(ui, &self.get_info);
});
});
}
@@ -167,7 +188,7 @@ impl ClnDash {
let rune = std::env::var("RUNE").unwrap_or(
"Vns1Zxvidr4J8pP2ZCg3Wjp2SyGyyf5RHgvFG8L36yM9MzMmbWV0aG9kPWdldGluZm8=".to_string(),
);
let commando = CommandoClient::spawn(lnsocket, &rune);
let commando = Arc::new(CommandoClient::spawn(lnsocket, &rune));
loop {
match req_rx.recv().await {
@@ -181,25 +202,47 @@ impl ClnDash {
Some(req) => {
tracing::debug!("calling {req:?}");
match req {
Request::GetInfo => match commando.call("getinfo", json!({})).await {
Ok(v) => {
let _ = event_tx.send(Event::Response(ClnResponse::GetInfo(v)));
}
Err(err) => {
tracing::error!("get_info error {}", err);
}
},
Request::GetInfo => {
let event_tx = event_tx.clone();
let commando = commando.clone();
tokio::spawn(async move {
match commando.call("getinfo", json!({})).await {
Ok(v) => {
let _ = event_tx
.send(Event::Response(ClnResponse::GetInfo(v)));
}
Err(err) => {
tracing::error!("get_info error {}", err);
}
}
});
}
Request::PaidInvoices(n) => {
let event_tx = event_tx.clone();
let commando = commando.clone();
tokio::spawn(async move {
let invoices = fetch_paid_invoices(commando, n).await;
let _ = event_tx
.send(Event::Response(ClnResponse::PaidInvoices(invoices)));
});
}
Request::ListPeerChannels => {
let peer_channels =
commando.call("listpeerchannels", json!({})).await;
let channels = peer_channels.map(|v| {
let peer_channels: Vec<ListPeerChannel> =
serde_json::from_value(v["channels"].clone()).unwrap();
to_channels(peer_channels)
let event_tx = event_tx.clone();
let commando = commando.clone();
tokio::spawn(async move {
let peer_channels =
commando.call("listpeerchannels", json!({})).await;
let channels = peer_channels.map(|v| {
let peer_channels: Vec<ListPeerChannel> =
serde_json::from_value(v["channels"].clone()).unwrap();
to_channels(peer_channels)
});
let _ = event_tx.send(Event::Response(
ClnResponse::ListPeerChannels(channels),
));
});
let _ = event_tx
.send(Event::Response(ClnResponse::ListPeerChannels(channels)));
}
}
}
@@ -223,20 +266,30 @@ impl ClnDash {
self.connection_state = ConnectionState::Active;
let _ = channel.req_tx.send(Request::GetInfo);
let _ = channel.req_tx.send(Request::ListPeerChannels);
let _ = channel.req_tx.send(Request::PaidInvoices(30));
}
Event::Response(resp) => match resp {
ClnResponse::ListPeerChannels(chans) => {
if let Some(Ok(prev)) = self.channels.as_ref() {
if let LoadingState::Loaded(prev) = &self.channels {
self.last_summary = Some(compute_summary(prev));
}
self.channels = Some(chans);
self.summary = match &chans {
Ok(chans) => LoadingState::Loaded(compute_summary(chans)),
Err(err) => LoadingState::Failed(err.clone()),
};
self.channels = LoadingState::from_result(chans);
}
ClnResponse::GetInfo(value) => {
if let Ok(s) = serde_json::to_string_pretty(&value) {
self.get_info = Some(s);
}
let res = serde_json::to_string_pretty(&value);
self.get_info =
LoadingState::from_result(res.map_err(|_| lnsocket::Error::Json));
}
ClnResponse::PaidInvoices(invoices) => {
self.invoices = LoadingState::from_result(invoices);
}
},
}
@@ -244,9 +297,15 @@ impl ClnDash {
}
}
fn get_info_ui(ui: &mut egui::Ui, info: &str) {
ui.horizontal_wrapped(|ui| {
ui.add(Label::new(info).wrap_mode(egui::TextWrapMode::Wrap));
fn get_info_ui(ui: &mut egui::Ui, info: &LoadingState<String, lnsocket::Error>) {
ui.horizontal_wrapped(|ui| match info {
LoadingState::Loading => {}
LoadingState::Failed(err) => {
ui.label(format!("failed to fetch node info: {err}"));
}
LoadingState::Loaded(info) => {
ui.add(Label::new(info).wrap_mode(egui::TextWrapMode::Wrap));
}
});
}
@@ -333,9 +392,25 @@ fn human_sat(msat: i64) -> String {
}
}
fn channels_ui(ui: &mut egui::Ui, channels: &Option<Result<Channels, lnsocket::Error>>) {
fn human_verbose_sat(msat: i64) -> String {
if msat < 1_000 {
// less than 1 sat
format!("{msat} msat")
} else {
let sats = msat / 1_000;
if sats < 100_000_000 {
// less than 1 BTC
format!("{sats} sat")
} else {
let btc = sats / 100_000_000;
format!("{btc} BTC")
}
}
}
fn channels_ui(ui: &mut egui::Ui, channels: &LoadingState<Channels, lnsocket::Error>) {
match channels {
Some(Ok(channels)) => {
LoadingState::Loaded(channels) => {
if channels.channels.is_empty() {
ui.label("no channels yet...");
return;
@@ -348,11 +423,11 @@ fn channels_ui(ui: &mut egui::Ui, channels: &Option<Result<Channels, lnsocket::E
ui.label(format!("available out {}", human_sat(channels.avail_out)));
ui.label(format!("available in {}", human_sat(channels.avail_in)));
}
Some(Err(err)) => {
LoadingState::Failed(err) => {
ui.label(format!("error fetching channels: {err}"));
}
None => {
ui.label("no channels yet...");
LoadingState::Loading => {
ui.label("fetching channels...");
}
}
}
@@ -524,3 +599,48 @@ fn delta_str(new: i64, old: i64) -> String {
std::cmp::Ordering::Equal => "·".into(),
}
}
fn invoices_ui(ui: &mut egui::Ui, invoices: &LoadingState<Vec<Invoice>, lnsocket::Error>) {
match invoices {
LoadingState::Loading => {
ui.label("loading invoices...");
}
LoadingState::Failed(err) => {
ui.label(format!("failed to load invoices: {err}"));
}
LoadingState::Loaded(invoices) => {
use egui_extras::{Column, TableBuilder};
TableBuilder::new(ui)
.column(Column::auto().resizable(true))
.column(Column::remainder())
.header(20.0, |mut header| {
header.col(|ui| {
ui.heading("Description");
});
header.col(|ui| {
ui.heading("Amount");
});
})
.body(|mut body| {
for invoice in invoices {
body.row(30.0, |mut row| {
row.col(|ui| {
ui.label(invoice.description.clone());
});
row.col(|ui| match invoice.bolt11.amount_milli_satoshis() {
None => {
ui.label("any");
}
Some(amt) => {
ui.label(human_verbose_sat(amt as i64));
}
});
});
}
});
}
}
}

View File

@@ -0,0 +1,198 @@
use crate::event::Invoice;
use lnsocket::CallOpts;
use lnsocket::CommandoClient;
use serde::Deserialize;
use serde_json::json;
use std::sync::Arc;
#[derive(Deserialize)]
struct UpdatedInvoicesResponse {
updated: u64,
}
#[derive(Deserialize)]
struct PayIndexInvoices {
invoices: Vec<PayIndexScan>,
}
#[derive(Deserialize)]
struct PayIndexScan {
pay_index: Option<u64>,
}
async fn find_lastpay_index(commando: Arc<CommandoClient>) -> Result<Option<u64>, lnsocket::Error> {
const PAGE: u64 = 250;
// 1) get the current updated tail
let created_value = commando
.call(
"wait",
json!({"subsystem":"invoices","indexname":"updated","nextvalue":0}),
)
.await?;
let response: UpdatedInvoicesResponse =
serde_json::from_value(created_value).map_err(|_| lnsocket::Error::Json)?;
// start our window at the tail
let mut start_at = response
.updated
.saturating_add(1) // +1 because we want max(1, updated - PAGE + 1)
.saturating_sub(PAGE)
.max(1);
loop {
// 2) fetch a window (indexed by "updated")
let val = commando
.call_with_opts(
"listinvoices",
json!({
"index": "updated",
"start": start_at,
"limit": PAGE,
}),
// only fetch the one field we care about
CallOpts::default().filter(json!({
"invoices": [{"pay_index": true}]
})),
)
.await?;
let parsed: PayIndexInvoices =
serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?;
if let Some(pi) = parsed.invoices.iter().filter_map(|inv| inv.pay_index).max() {
return Ok(Some(pi));
}
// 4) no paid invoice in this slice—step back or bail
if start_at == 1 {
return Ok(None);
}
start_at = start_at.saturating_sub(PAGE).max(1);
}
}
pub async fn fetch_paid_invoices(
commando: Arc<CommandoClient>,
limit: u32,
) -> Result<Vec<Invoice>, lnsocket::Error> {
use tokio::task::JoinSet;
// look for an invoice with the last paid index
let Some(lastpay_index) = find_lastpay_index(commando.clone()).await? else {
// no paid invoices
return Ok(vec![]);
};
let mut set: JoinSet<Result<Invoice, lnsocket::Error>> = JoinSet::new();
let start = lastpay_index.saturating_sub(limit as u64);
// 3) Fire off at most `concurrency` `waitanyinvoice` calls at a time,
// collect all successful responses into a Vec.
// fire them ALL at once
for idx in start..lastpay_index {
let c = commando.clone();
set.spawn(async move {
let val = c
.call(
"waitanyinvoice",
serde_json::json!({ "lastpay_index": idx }),
)
.await?;
let parsed: Invoice = serde_json::from_value(val).map_err(|_| lnsocket::Error::Json)?;
Ok(parsed)
});
}
let mut results = Vec::with_capacity(limit as usize);
while let Some(res) = set.join_next().await {
results.push(res.map_err(|_| lnsocket::Error::Io(std::io::ErrorKind::Interrupted))??);
}
results.sort_by(|a, b| a.updated_index.cmp(&b.updated_index));
Ok(results)
}
// wip watch subsystem
/*
async fn watch_subsystem(
commando: CommandoClient,
subsystem: WaitSubsystem,
index: WaitIndex,
event_tx: UnboundedSender<Event>,
mut cancel_rx: Receiver<()>,
) {
// Step 1: Fetch current index value so we can back up ~20
let mut nextvalue: u64 = match commando
.call(
"wait",
serde_json::json!({
"indexname": index.as_str(),
"subsystem": subsystem.as_str(),
"nextvalue": 0
}),
)
.await
{
Ok(v) => {
// You showed the result has `updated` as the current highest index
let current = v.get("updated").and_then(|x| x.as_u64()).unwrap_or(0);
current.saturating_sub(20) // back up 20, clamp at 0
}
Err(err) => {
tracing::warn!("initial wait(…nextvalue=0) failed: {}", err);
0
}
};
loop {
// You can add a timeout to avoid hanging forever in weird network states.
let fut = commando.call(
"wait",
serde_json::to_value(WaitRequest {
indexname: "invoices".into(),
subsystem: "lightningd".into(),
nextvalue,
})
.unwrap(),
);
tokio::select! {
_ = &mut cancel_rx => {
// graceful shutdown
break;
}
res = fut => {
match res {
Ok(v) => {
// Typical shape: { "nextvalue": n, "invoicestatus": { ... } } (varies by plugin/index)
// Adjust these lookups for your nodes actual wait payload.
if let Some(nv) = v.get("nextvalue").and_then(|x| x.as_u64()) {
nextvalue = nv + 1;
} else {
// Defensive: never get stuck — bump at least by 1
nextvalue += 1;
}
// Inspect/route
let kind = v.get("status").and_then(|s| s.as_str());
let ev = match kind {
Some("paid") => ClnResponse::Invoice(InvoiceEvent::Paid(v.clone())),
Some("created") => ClnResponse::Invoice(InvoiceEvent::Created(v.clone())),
_ => ClnResponse::Invoice(InvoiceEvent::Other(v.clone())),
};
let _ = event_tx.send(Event::Response(ev));
}
Err(err) => {
tracing::warn!("wait(invoices) error: {err}");
// small backoff so we don't tight-loop on persistent errors
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
}
}
}
*/