From c2eadb88bef7c12413169fe4768fe5361b6399e3 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Sat, 22 Jul 2023 13:56:36 +0930 Subject: [PATCH] wait: new command to wait on indexes. This will initially be for listinvoices, but can be expanded to other list commands. It's documented, but it makes promises which currently don't exist: * listinvoice does not support `index` or `start` yet. * It doesn't actually fire when invoices change yet. Signed-off-by: Rusty Russell Changelog-Added: JSON-RPC: `wait`: new generic command to wait for events. --- doc/Makefile | 1 + doc/index.rst | 1 + doc/lightning-wait.7.md | 79 +++++++++++++++ lightningd/Makefile | 1 + lightningd/lightningd.c | 1 + lightningd/lightningd.h | 6 ++ lightningd/wait.c | 216 ++++++++++++++++++++++++++++++++++++++++ lightningd/wait.h | 56 +++++++++++ 8 files changed, 361 insertions(+) create mode 100644 doc/lightning-wait.7.md create mode 100644 lightningd/wait.c create mode 100644 lightningd/wait.h diff --git a/doc/Makefile b/doc/Makefile index f74c3fe56..6f4cab522 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -104,6 +104,7 @@ MANPAGES := doc/lightning-cli.1 \ doc/lightning-txsend.7 \ doc/lightning-unreserveinputs.7 \ doc/lightning-utxopsbt.7 \ + doc/lightning-wait.7 \ doc/lightning-waitinvoice.7 \ doc/lightning-waitanyinvoice.7 \ doc/lightning-waitblockheight.7 \ diff --git a/doc/index.rst b/doc/index.rst index 66a17fd42..0060662f9 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -140,6 +140,7 @@ Core Lightning Documentation lightning-unreserveinputs lightning-upgradewallet lightning-utxopsbt + lightning-wait lightning-waitanyinvoice lightning-waitblockheight lightning-waitinvoice diff --git a/doc/lightning-wait.7.md b/doc/lightning-wait.7.md new file mode 100644 index 000000000..aa83ed355 --- /dev/null +++ b/doc/lightning-wait.7.md @@ -0,0 +1,79 @@ +lightning-wait -- Command to wait for creations, changes and deletions +====================================================================== + +SYNOPSIS +-------- + +**wait** *subsystem* *indexname* *nextvalue* + +DESCRIPTION +----------- + +The **wait** RPC command returns once the index given by *indexname* +in *subsystem* reaches or exceeds *nextvalue*. All indexes start at 0, when no +events have happened (**wait** with a *nextvalue* of 0 is a way of getting +the current index, though naturally this is racy!). + +*indexname* is one of `created`, `updated` or `deleted`: +- `created` is incremented by one for every new object. +- `updated` is incremented by one every time an object is changed. +- `deleted` is incremented by one every time an object is deleted. + +*subsystem* is one of: +- `invoices`: corresponding to `listinvoices`. + + +RELIABILITY +----------- + +Indices can go forward by more than one; in particlar, if multiple +objects were created and the one deleted, you could see this effect. +Similarly, there are some places (e.g. invoice expiration) where we +can update multiple entries at once. + +Indices only monotoncally increase. + +USAGE +----- + +The **wait** RPC is used to track changes in the system. Consider +tracking invoices being paid or expiring. The simplest (and +inefficient method) would be: + +1. Call `listinvoices` to get the current state of all invoices, and + remember the highest `updated_index`. Say it was 5. +2. Call `wait invoices updated 6`. +3. When it returns, call `listinvoices` again to see what changed. + +This is obviously inefficient, so there are two optimizations: + +1. Call `listinvoices` with `index=updated` and `start=6` to only see invoices + with `updated_index` greater than or equal to 6. +2. `wait` itself may also return some limited subset of fields from the list + command (it can't do this in all cases); for `invoices` this is `label` + and `status`, allowing many callers to avoid the `listinvoices` call. + +RETURN VALUE +------------ +FIXME + +On error the returned object will contain `code` and `message` properties, +with `code` being one of the following: + +- -32602: If the given parameters are wrong. + +AUTHOR +------ + +Rusty Russell <> is mainly +responsible. + +SEE ALSO +-------- + +lightning-listinvoice(7) + +RESOURCES +--------- + +Main web site: diff --git a/lightningd/Makefile b/lightningd/Makefile index 0085da531..a12345ce9 100644 --- a/lightningd/Makefile +++ b/lightningd/Makefile @@ -39,6 +39,7 @@ LIGHTNINGD_SRC := \ lightningd/routehint.c \ lightningd/runes.c \ lightningd/subd.c \ + lightningd/wait.c \ lightningd/watch.c LIGHTNINGD_SRC_NOHDR := \ diff --git a/lightningd/lightningd.c b/lightningd/lightningd.c index c420337a5..5b4a63676 100644 --- a/lightningd/lightningd.c +++ b/lightningd/lightningd.c @@ -213,6 +213,7 @@ static struct lightningd *new_lightningd(const tal_t *ctx) list_head_init(&ld->ping_commands); list_head_init(&ld->disconnect_commands); list_head_init(&ld->waitblockheight_commands); + list_head_init(&ld->wait_commands); /*~ Tal also explicitly supports arrays: it stores the number of * elements, which can be accessed with tal_count() (or tal_bytelen() diff --git a/lightningd/lightningd.h b/lightningd/lightningd.h index ac70e252a..72c8e5ed5 100644 --- a/lightningd/lightningd.h +++ b/lightningd/lightningd.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -237,6 +238,8 @@ struct lightningd { struct list_head ping_commands; /* Outstanding disconnect commands. */ struct list_head disconnect_commands; + /* Outstanding wait commands */ + struct list_head wait_commands; /* Maintained by invoices.c */ struct invoices *invoices; @@ -265,6 +268,9 @@ struct lightningd { /* Announce names in config as DNS records (recently BOLT 7 addition) */ bool announce_dns; + /* Indexes used by all the wait infra */ + struct indexes indexes[NUM_WAIT_SUBSYSTEM]; + #if DEVELOPER /* If we want to debug a subdaemon/plugin. */ char *dev_debug_subprocess; diff --git a/lightningd/wait.c b/lightningd/wait.c new file mode 100644 index 000000000..70de05a9a --- /dev/null +++ b/lightningd/wait.c @@ -0,0 +1,216 @@ +/* Code to be notified when various standardized events happen. */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include + +struct waiter { + struct list_node list; + struct command *cmd; + /* These are pointers because of how param_ works */ + enum wait_subsystem *subsystem; + enum wait_index *index; + u64 *nextval; +}; + + +static const char *subsystem_names[] = { + "invoices", +}; + +static const char *index_names[] = { + "created", + "updated", + "deleted", +}; + +/* This is part of the API, so no changing! */ +const char *wait_index_name(enum wait_index index) +{ + switch (index) { + case WAIT_INDEX_CREATED: + case WAIT_INDEX_UPDATED: + case WAIT_INDEX_DELETED: + return index_names[index]; + } + abort(); +} + +const char *wait_subsystem_name(enum wait_subsystem subsystem) +{ + switch (subsystem) { + case WAIT_SUBSYSTEM_INVOICE: + return subsystem_names[subsystem]; + } + abort(); +} + +static u64 *wait_index_ptr(struct lightningd *ld, + enum wait_subsystem subsystem, + enum wait_index index) +{ + struct indexes *indexes; + + assert(subsystem < ARRAY_SIZE(ld->indexes)); + indexes = &ld->indexes[subsystem]; + + assert(index < ARRAY_SIZE(indexes->i)); + + return &indexes->i[index]; +} + +static void json_add_index(struct json_stream *response, + enum wait_subsystem subsystem, + enum wait_index index, + u64 val, + va_list *ap) +{ + const char *name, *value; + json_add_string(response, "subsystem", wait_subsystem_name(subsystem)); + json_add_u64(response, wait_index_name(index), val); + + if (!ap) + return; + + json_object_start(response, "details"); + while ((name = va_arg(*ap, const char *)) != NULL) { + value = va_arg(*ap, const char *); + if (!value) + continue; + + /* This is a hack! */ + if (name[0] == '=') { + /* Copy in literallty! */ + json_add_jsonstr(response, name + 1, value, strlen(value)); + } else { + json_add_string(response, name, value); + } + } + json_object_end(response); +} + +u64 wait_index_increment(struct lightningd *ld, + enum wait_subsystem subsystem, + enum wait_index index, + ...) +{ + struct waiter *i, *n; + va_list ap; + u64 *idxval = wait_index_ptr(ld, subsystem, index); + + assert(!add_overflows_u64(*idxval, 1)); + (*idxval)++; + + /* FIXME: We can optimize this! It's always the max of the fields in + * the table, *unless* we delete one. So we can lazily write this on + * delete, and fix it up to MAX() when we startup. */ + db_set_intvar(ld->wallet->db, + tal_fmt(tmpctx, "last_%s_%s_index", + wait_subsystem_name(subsystem), + wait_index_name(index)), + *idxval); + + list_for_each_safe(&ld->wait_commands, i, n, list) { + struct json_stream *response; + + if (*i->subsystem != subsystem) + continue; + if (*i->index != index) + continue; + if (*idxval < *i->nextval) + continue; + + response = json_stream_success(i->cmd); + va_start(ap, index); + json_add_index(response, subsystem, index, *idxval, &ap); + va_end(ap); + /* Delete before freeing */ + list_del_from(&ld->wait_commands, &i->list); + was_pending(command_success(i->cmd, response)); + } + + return *idxval; +} + +static struct command_result *param_subsystem(struct command *cmd, + const char *name, + const char *buffer, + const jsmntok_t *tok, + enum wait_subsystem **subsystem) +{ + for (size_t i = 0; i < ARRAY_SIZE(subsystem_names); i++) { + if (json_tok_streq(buffer, tok, subsystem_names[i])) { + *subsystem = tal(cmd, enum wait_subsystem); + **subsystem = i; + return NULL; + } + } + + return command_fail_badparam(cmd, name, buffer, tok, + "unknown subsystem"); +} + +struct command_result *param_index(struct command *cmd, + const char *name, + const char *buffer, + const jsmntok_t *tok, + enum wait_index **index) +{ + for (size_t i = 0; i < ARRAY_SIZE(index_names); i++) { + if (json_tok_streq(buffer, tok, index_names[i])) { + *index = tal(cmd, enum wait_index); + **index = i; + return NULL; + } + } + + return command_fail_badparam(cmd, name, buffer, tok, + "unknown index"); +} + +static struct command_result *json_wait(struct command *cmd, + const char *buffer, + const jsmntok_t *obj UNNEEDED, + const jsmntok_t *params) +{ + struct waiter *waiter = tal(cmd, struct waiter); + u64 val; + + if (!param(cmd, buffer, params, + p_req("subsystem", param_subsystem, + &waiter->subsystem), + p_req("indexname", param_index, &waiter->index), + p_req("nextvalue", param_u64, &waiter->nextval), + NULL)) + return command_param_failed(); + + /* Are we there already? Return immediately. */ + val = *wait_index_ptr(cmd->ld, *waiter->subsystem, *waiter->index); + if (val >= *waiter->nextval) { + struct json_stream *response; + + response = json_stream_success(cmd); + json_add_index(response, + *waiter->subsystem, + *waiter->index, + val, NULL); + return command_success(cmd, response); + } + + waiter->cmd = cmd; + list_add_tail(&cmd->ld->wait_commands, &waiter->list); + return command_still_pending(cmd); +} + +static const struct json_command wait_command = { + "wait", + "utility", + json_wait, + "Wait for {subsystem} {indexname} to reach or exceed {value})" +}; +AUTODATA(json_command, &wait_command); diff --git a/lightningd/wait.h b/lightningd/wait.h new file mode 100644 index 000000000..35f50d45c --- /dev/null +++ b/lightningd/wait.h @@ -0,0 +1,56 @@ +#ifndef LIGHTNING_LIGHTNINGD_WAIT_H +#define LIGHTNING_LIGHTNINGD_WAIT_H +#include "config.h" +#include + +struct lightningd; + +/* This WAIT_SUBSYSTEM_X corresponds to listX */ +enum wait_subsystem { + WAIT_SUBSYSTEM_INVOICE +}; +#define NUM_WAIT_SUBSYSTEM (WAIT_SUBSYSTEM_INVOICE+1) + +enum wait_index { + WAIT_INDEX_CREATED, + WAIT_INDEX_UPDATED, + WAIT_INDEX_DELETED, +}; +#define NUM_WAIT_INDEX (WAIT_INDEX_DELETED+1) + +/** + * structure for keeping created/updated/deleted indexes in the db + */ +struct indexes { + u64 i[NUM_WAIT_INDEX]; +}; + +/* Get a string */ +const char *wait_index_name(enum wait_index index); +const char *wait_subsystem_name(enum wait_subsystem subsystem); + +/** + * wait_index_increment - increment an index, tell waiters. + * @ld: the lightningd + * @subsystem: subsystem for index + * @index: which index + * ...: name/value pairs, followed by NULL. + * + * Increase index, write to db, wake any waiters, give them any name/value pairs. + * If the value is NULL, omit that name. + * If the name starts with '=', the value is a JSON literal (and skip over the =) + * + * Returns the updated index value (always > 0). + */ +u64 LAST_ARG_NULL wait_index_increment(struct lightningd *ld, + enum wait_subsystem subsystem, + enum wait_index index, + ...); + +/* For passing in index parameters. */ +struct command_result *param_index(struct command *cmd, const char *name, + const char *buffer, + const jsmntok_t *tok, + enum wait_index **index); + +#endif /* LIGHTNING_LIGHTNINGD_WAIT_H */