mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-19 15:14:23 +01:00
libplugin: include the rpc conn into the global state
And rename the struct name, as it's now only used for RPC.
This commit is contained in:
@@ -3,7 +3,6 @@
|
||||
#include <ccan/intmap/intmap.h>
|
||||
#include <ccan/io/io.h>
|
||||
#include <ccan/json_out/json_out.h>
|
||||
#include <ccan/membuf/membuf.h>
|
||||
#include <ccan/read_write_all/read_write_all.h>
|
||||
#include <ccan/strmap/strmap.h>
|
||||
#include <ccan/tal/str/str.h>
|
||||
@@ -39,56 +38,11 @@ bool deprecated_apis;
|
||||
|
||||
extern const struct chainparams *chainparams;
|
||||
|
||||
struct plugin {
|
||||
/* lightningd interaction */
|
||||
struct io_conn *stdin_conn;
|
||||
struct io_conn *stdout_conn;
|
||||
|
||||
/* To read from lightningd */
|
||||
char *buffer;
|
||||
size_t used, len_read;
|
||||
|
||||
/* To write to lightningd */
|
||||
struct json_stream **js_arr;
|
||||
|
||||
enum plugin_restartability restartability;
|
||||
const struct plugin_command *commands;
|
||||
size_t num_commands;
|
||||
const struct plugin_notification *notif_subs;
|
||||
size_t num_notif_subs;
|
||||
const struct plugin_hook *hook_subs;
|
||||
size_t num_hook_subs;
|
||||
struct plugin_option *opts;
|
||||
|
||||
/* Anything special to do at init ? */
|
||||
void (*init)(struct plugin_conn *,
|
||||
const char *buf, const jsmntok_t *);
|
||||
/* Has the manifest been sent already ? */
|
||||
bool manifested;
|
||||
/* Has init been received ? */
|
||||
bool initialized;
|
||||
};
|
||||
|
||||
struct plugin_timer {
|
||||
struct timer timer;
|
||||
struct command_result *(*cb)(void);
|
||||
};
|
||||
|
||||
struct plugin_conn {
|
||||
int fd;
|
||||
MEMBUF(char) mb;
|
||||
};
|
||||
|
||||
/* Connection to make RPC requests. */
|
||||
static struct plugin_conn rpc_conn;
|
||||
|
||||
struct command {
|
||||
u64 *id;
|
||||
const char *methodname;
|
||||
bool usage_only;
|
||||
struct plugin *plugin;
|
||||
};
|
||||
|
||||
struct out_req {
|
||||
/* The unique id of this request. */
|
||||
u64 id;
|
||||
@@ -109,7 +63,7 @@ struct out_req {
|
||||
|
||||
/* command_result is mainly used as a compile-time check to encourage you
|
||||
* to return as soon as you get one (and not risk use-after-free of command).
|
||||
* Here we use two values: complete (cmd freed) an pending (still going) */
|
||||
* Here we use two values: complete (cmd freed) and pending (still going) */
|
||||
struct command_result {
|
||||
char c;
|
||||
};
|
||||
@@ -177,30 +131,30 @@ static void *membuf_tal_realloc(struct membuf *mb, void *rawelems,
|
||||
return p;
|
||||
}
|
||||
|
||||
static int read_json(struct plugin_conn *conn)
|
||||
static int read_json_from_rpc(struct plugin *p)
|
||||
{
|
||||
char *end;
|
||||
|
||||
/* We rely on the double-\n marker which only terminates JSON top
|
||||
* levels. Thanks lightningd! */
|
||||
while ((end = memmem(membuf_elems(&conn->mb),
|
||||
membuf_num_elems(&conn->mb), "\n\n", 2))
|
||||
while ((end = memmem(membuf_elems(&p->rpc_conn.mb),
|
||||
membuf_num_elems(&p->rpc_conn.mb), "\n\n", 2))
|
||||
== NULL) {
|
||||
ssize_t r;
|
||||
|
||||
/* Make sure we've room for at least READ_CHUNKSIZE. */
|
||||
membuf_prepare_space(&conn->mb, READ_CHUNKSIZE);
|
||||
r = read(conn->fd, membuf_space(&conn->mb),
|
||||
membuf_num_space(&conn->mb));
|
||||
membuf_prepare_space(&p->rpc_conn.mb, READ_CHUNKSIZE);
|
||||
r = read(p->rpc_conn.fd, membuf_space(&p->rpc_conn.mb),
|
||||
membuf_num_space(&p->rpc_conn.mb));
|
||||
/* lightningd goes away, we go away. */
|
||||
if (r == 0)
|
||||
exit(0);
|
||||
if (r < 0)
|
||||
plugin_err("Reading JSON input: %s", strerror(errno));
|
||||
membuf_added(&conn->mb, r);
|
||||
membuf_added(&p->rpc_conn.mb, r);
|
||||
}
|
||||
|
||||
return end + 2 - membuf_elems(&conn->mb);
|
||||
return end + 2 - membuf_elems(&p->rpc_conn.mb);
|
||||
}
|
||||
|
||||
/* This starts a JSON RPC message with boilerplate */
|
||||
@@ -355,7 +309,7 @@ void command_set_usage(struct command *cmd, const char *usage TAKES)
|
||||
/* Reads rpc reply and returns tokens, setting contents to 'error' or
|
||||
* 'result' (depending on *error). */
|
||||
static const jsmntok_t *read_rpc_reply(const tal_t *ctx,
|
||||
struct plugin_conn *rpc,
|
||||
struct plugin *plugin,
|
||||
const jsmntok_t **contents,
|
||||
bool *error,
|
||||
int *reqlen)
|
||||
@@ -363,22 +317,22 @@ static const jsmntok_t *read_rpc_reply(const tal_t *ctx,
|
||||
const jsmntok_t *toks;
|
||||
bool valid;
|
||||
|
||||
*reqlen = read_json(rpc);
|
||||
*reqlen = read_json_from_rpc(plugin);
|
||||
|
||||
toks = json_parse_input(ctx, membuf_elems(&rpc->mb), *reqlen, &valid);
|
||||
toks = json_parse_input(ctx, membuf_elems(&plugin->rpc_conn.mb), *reqlen, &valid);
|
||||
if (!valid)
|
||||
plugin_err("Malformed JSON reply '%.*s'",
|
||||
*reqlen, membuf_elems(&rpc->mb));
|
||||
*reqlen, membuf_elems(&plugin->rpc_conn.mb));
|
||||
|
||||
*contents = json_get_member(membuf_elems(&rpc->mb), toks, "error");
|
||||
*contents = json_get_member(membuf_elems(&plugin->rpc_conn.mb), toks, "error");
|
||||
if (*contents)
|
||||
*error = true;
|
||||
else {
|
||||
*contents = json_get_member(membuf_elems(&rpc->mb), toks,
|
||||
*contents = json_get_member(membuf_elems(&plugin->rpc_conn.mb), toks,
|
||||
"result");
|
||||
if (!*contents)
|
||||
plugin_err("JSON reply with no 'result' nor 'error'? '%.*s'",
|
||||
*reqlen, membuf_elems(&rpc->mb));
|
||||
*reqlen, membuf_elems(&plugin->rpc_conn.mb));
|
||||
*error = false;
|
||||
}
|
||||
return toks;
|
||||
@@ -402,9 +356,10 @@ static struct json_out *start_json_request(const tal_t *ctx,
|
||||
|
||||
/* Synchronous routine to send command and extract single field from response */
|
||||
const char *rpc_delve(const tal_t *ctx,
|
||||
struct plugin *plugin,
|
||||
const char *method,
|
||||
const struct json_out *params TAKES,
|
||||
struct plugin_conn *rpc, const char *guide)
|
||||
const char *guide)
|
||||
{
|
||||
bool error;
|
||||
const jsmntok_t *contents, *t;
|
||||
@@ -413,24 +368,24 @@ const char *rpc_delve(const tal_t *ctx,
|
||||
struct json_out *jout;
|
||||
|
||||
jout = start_json_request(tmpctx, 0, method, params);
|
||||
finish_and_send_json(rpc->fd, jout);
|
||||
finish_and_send_json(plugin->rpc_conn.fd, jout);
|
||||
|
||||
read_rpc_reply(tmpctx, rpc, &contents, &error, &reqlen);
|
||||
read_rpc_reply(tmpctx, plugin, &contents, &error, &reqlen);
|
||||
if (error)
|
||||
plugin_err("Got error reply to %s: '%.*s'",
|
||||
method, reqlen, membuf_elems(&rpc->mb));
|
||||
method, reqlen, membuf_elems(&plugin->rpc_conn.mb));
|
||||
|
||||
t = json_delve(membuf_elems(&rpc->mb), contents, guide);
|
||||
t = json_delve(membuf_elems(&plugin->rpc_conn.mb), contents, guide);
|
||||
if (!t)
|
||||
plugin_err("Could not find %s in reply to %s: '%.*s'",
|
||||
guide, method, reqlen, membuf_elems(&rpc->mb));
|
||||
guide, method, reqlen, membuf_elems(&plugin->rpc_conn.mb));
|
||||
|
||||
ret = json_strdup(ctx, membuf_elems(&rpc->mb), t);
|
||||
membuf_consume(&rpc->mb, reqlen);
|
||||
ret = json_strdup(ctx, membuf_elems(&plugin->rpc_conn.mb), t);
|
||||
membuf_consume(&plugin->rpc_conn.mb, reqlen);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void handle_rpc_reply(struct plugin_conn *rpc)
|
||||
static void handle_rpc_reply(struct plugin *plugin)
|
||||
{
|
||||
int reqlen;
|
||||
const jsmntok_t *toks, *contents, *t;
|
||||
@@ -439,33 +394,33 @@ static void handle_rpc_reply(struct plugin_conn *rpc)
|
||||
u64 id;
|
||||
bool error;
|
||||
|
||||
toks = read_rpc_reply(tmpctx, rpc, &contents, &error, &reqlen);
|
||||
toks = read_rpc_reply(tmpctx, plugin, &contents, &error, &reqlen);
|
||||
|
||||
t = json_get_member(membuf_elems(&rpc->mb), toks, "id");
|
||||
t = json_get_member(membuf_elems(&plugin->rpc_conn.mb), toks, "id");
|
||||
if (!t)
|
||||
plugin_err("JSON reply without id '%.*s'",
|
||||
reqlen, membuf_elems(&rpc->mb));
|
||||
if (!json_to_u64(membuf_elems(&rpc->mb), t, &id))
|
||||
reqlen, membuf_elems(&plugin->rpc_conn.mb));
|
||||
if (!json_to_u64(membuf_elems(&plugin->rpc_conn.mb), t, &id))
|
||||
plugin_err("JSON reply without numeric id '%.*s'",
|
||||
reqlen, membuf_elems(&rpc->mb));
|
||||
reqlen, membuf_elems(&plugin->rpc_conn.mb));
|
||||
out = uintmap_get(&out_reqs, id);
|
||||
if (!out)
|
||||
plugin_err("JSON reply with unknown id '%.*s' (%"PRIu64")",
|
||||
reqlen, membuf_elems(&rpc->mb), id);
|
||||
reqlen, membuf_elems(&plugin->rpc_conn.mb), id);
|
||||
|
||||
/* We want to free this if callback doesn't. */
|
||||
tal_steal(tmpctx, out);
|
||||
uintmap_del(&out_reqs, out->id);
|
||||
|
||||
if (error)
|
||||
res = out->errcb(out->cmd, membuf_elems(&rpc->mb), contents,
|
||||
out->arg);
|
||||
res = out->errcb(out->cmd, membuf_elems(&plugin->rpc_conn.mb),
|
||||
contents, out->arg);
|
||||
else
|
||||
res = out->cb(out->cmd, membuf_elems(&rpc->mb), contents,
|
||||
out->arg);
|
||||
res = out->cb(out->cmd, membuf_elems(&plugin->rpc_conn.mb),
|
||||
contents, out->arg);
|
||||
|
||||
assert(res == &pending || res == &complete);
|
||||
membuf_consume(&rpc->mb, reqlen);
|
||||
membuf_consume(&plugin->rpc_conn.mb, reqlen);
|
||||
}
|
||||
|
||||
struct command_result *
|
||||
@@ -494,7 +449,7 @@ send_outreq_(struct command *cmd,
|
||||
uintmap_add(&out_reqs, out->id, out);
|
||||
|
||||
jout = start_json_request(tmpctx, out->id, method, params);
|
||||
finish_and_send_json(rpc_conn.fd, jout);
|
||||
finish_and_send_json(cmd->plugin->rpc_conn.fd, jout);
|
||||
|
||||
return &pending;
|
||||
}
|
||||
@@ -572,7 +527,7 @@ static struct command_result *handle_init(struct command *cmd,
|
||||
chainparams = chainparams_for_network(network);
|
||||
|
||||
rpctok = json_delve(buf, configtok, ".rpc-file");
|
||||
rpc_conn.fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
p->rpc_conn.fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (rpctok->end - rpctok->start + 1 > sizeof(addr.sun_path))
|
||||
plugin_err("rpc filename '%.*s' too long",
|
||||
rpctok->end - rpctok->start,
|
||||
@@ -581,15 +536,14 @@ static struct command_result *handle_init(struct command *cmd,
|
||||
addr.sun_path[rpctok->end - rpctok->start] = '\0';
|
||||
addr.sun_family = AF_UNIX;
|
||||
|
||||
if (connect(rpc_conn.fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
|
||||
if (connect(p->rpc_conn.fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
|
||||
plugin_err("Connecting to '%.*s': %s",
|
||||
rpctok->end - rpctok->start, buf + rpctok->start,
|
||||
strerror(errno));
|
||||
|
||||
param_obj = json_out_obj(NULL, "config", "allow-deprecated-apis");
|
||||
deprecated_apis = streq(rpc_delve(tmpctx, "listconfigs",
|
||||
deprecated_apis = streq(rpc_delve(tmpctx, p, "listconfigs",
|
||||
take(param_obj),
|
||||
&rpc_conn,
|
||||
".allow-deprecated-apis"),
|
||||
"true");
|
||||
opttok = json_get_member(buf, params, "options");
|
||||
@@ -610,7 +564,7 @@ static struct command_result *handle_init(struct command *cmd,
|
||||
}
|
||||
|
||||
if (p->init)
|
||||
p->init(&rpc_conn, buf, configtok);
|
||||
p->init(p, buf, configtok);
|
||||
|
||||
return command_success_str(cmd, NULL);
|
||||
}
|
||||
@@ -652,7 +606,7 @@ static void setup_command_usage(const struct plugin_command *commands,
|
||||
}
|
||||
}
|
||||
|
||||
static void call_plugin_timer(struct plugin_conn *rpc, struct timer *timer)
|
||||
static void call_plugin_timer(struct rpc_conn *rpc, struct timer *timer)
|
||||
{
|
||||
struct plugin_timer *t = container_of(timer, struct plugin_timer, timer);
|
||||
|
||||
@@ -667,7 +621,7 @@ static void destroy_plugin_timer(struct plugin_timer *timer)
|
||||
timer_del(&timers, &timer->timer);
|
||||
}
|
||||
|
||||
struct plugin_timer *plugin_timer(struct plugin_conn *rpc, struct timerel t,
|
||||
struct plugin_timer *plugin_timer(struct rpc_conn *rpc, struct timerel t,
|
||||
struct command_result *(*cb)(void))
|
||||
{
|
||||
struct plugin_timer *timer = tal(NULL, struct plugin_timer);
|
||||
@@ -927,7 +881,7 @@ static struct io_plan *stdout_conn_init(struct io_conn *conn,
|
||||
}
|
||||
|
||||
static struct plugin *new_plugin(const tal_t *ctx,
|
||||
void (*init)(struct plugin_conn *rpc,
|
||||
void (*init)(struct plugin *p,
|
||||
const char *buf, const jsmntok_t *),
|
||||
const enum plugin_restartability restartability,
|
||||
const struct plugin_command *commands,
|
||||
@@ -945,6 +899,10 @@ static struct plugin *new_plugin(const tal_t *ctx,
|
||||
p->js_arr = tal_arr(p, struct json_stream *, 0);
|
||||
p->used = 0;
|
||||
p->len_read = 0;
|
||||
/* rpc. TODO: use ccan/io also for RPC */
|
||||
membuf_init(&p->rpc_conn.mb,
|
||||
tal_arr(p, char, READ_CHUNKSIZE), READ_CHUNKSIZE,
|
||||
membuf_tal_realloc);
|
||||
|
||||
p->init = init;
|
||||
p->manifested = p->initialized = false;
|
||||
@@ -972,7 +930,7 @@ static struct plugin *new_plugin(const tal_t *ctx,
|
||||
}
|
||||
|
||||
void plugin_main(char *argv[],
|
||||
void (*init)(struct plugin_conn *rpc,
|
||||
void (*init)(struct plugin *p,
|
||||
const char *buf, const jsmntok_t *),
|
||||
const enum plugin_restartability restartability,
|
||||
const struct plugin_command *commands,
|
||||
@@ -1000,12 +958,9 @@ void plugin_main(char *argv[],
|
||||
notif_subs, num_notif_subs, hook_subs,
|
||||
num_hook_subs, ap);
|
||||
va_end(ap);
|
||||
uintmap_init(&out_reqs);
|
||||
|
||||
timers_init(&timers, time_mono());
|
||||
membuf_init(&rpc_conn.mb,
|
||||
tal_arr(plugin, char, READ_CHUNKSIZE), READ_CHUNKSIZE,
|
||||
membuf_tal_realloc);
|
||||
uintmap_init(&out_reqs);
|
||||
|
||||
io_new_conn(plugin, STDIN_FILENO, stdin_conn_init, plugin);
|
||||
io_new_conn(plugin, STDOUT_FILENO, stdout_conn_init, plugin);
|
||||
@@ -1015,14 +970,14 @@ void plugin_main(char *argv[],
|
||||
|
||||
clean_tmpctx();
|
||||
|
||||
if (membuf_num_elems(&rpc_conn.mb) != 0) {
|
||||
handle_rpc_reply(&rpc_conn);
|
||||
if (membuf_num_elems(&plugin->rpc_conn.mb) != 0) {
|
||||
handle_rpc_reply(plugin);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Will only exit if a timer has expired. */
|
||||
io_loop(&timers, &expired);
|
||||
call_plugin_timer(&rpc_conn, expired);
|
||||
call_plugin_timer(&plugin->rpc_conn, expired);
|
||||
}
|
||||
|
||||
tal_free(plugin);
|
||||
|
||||
Reference in New Issue
Block a user