diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 4ac54bf3f..2d4fda783 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -8,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -38,6 +40,17 @@ bool deprecated_apis; extern const struct chainparams *chainparams; struct plugin { + /* lightningd interaction */ + struct io_conn *stdin_conn; + struct io_conn *stdout_conn; + + /* To read from lightningd */ + char *buffer; + size_t used, len_read; + + /* To write to lightningd */ + struct json_stream **js_arr; + enum plugin_restartability restartability; const struct plugin_command *commands; size_t num_commands; @@ -46,6 +59,14 @@ struct plugin { const struct plugin_hook *hook_subs; size_t num_hook_subs; struct plugin_option *opts; + + /* Anything special to do at init ? */ + void (*init)(struct plugin_conn *, + const char *buf, const jsmntok_t *); + /* Has the manifest been sent already ? */ + bool manifested; + /* Has init been received ? */ + bool initialized; }; struct plugin_timer { @@ -65,6 +86,7 @@ struct command { u64 *id; const char *methodname; bool usage_only; + struct plugin *plugin; }; struct out_req { @@ -148,43 +170,6 @@ static int read_json(struct plugin_conn *conn) return end + 2 - membuf_elems(&conn->mb); } -static struct command *read_json_request(const tal_t *ctx, - struct plugin_conn *conn, - struct plugin_conn *rpc, - const jsmntok_t **params, - int *reqlen) -{ - const jsmntok_t *toks, *id, *method; - bool valid; - struct command *cmd = tal(ctx, struct command); - - *reqlen = read_json(conn); - toks = json_parse_input(cmd, membuf_elems(&conn->mb), *reqlen, &valid); - if (!valid) - plugin_err("Malformed JSON input '%.*s'", - *reqlen, membuf_elems(&conn->mb)); - - if (toks[0].type != JSMN_OBJECT) - plugin_err("Malformed JSON command '%*.s' is not an object", - *reqlen, membuf_elems(&conn->mb)); - - method = json_get_member(membuf_elems(&conn->mb), toks, "method"); - *params = json_get_member(membuf_elems(&conn->mb), toks, "params"); - id = json_get_member(membuf_elems(&conn->mb), toks, "id"); - if (id) { - cmd->id = tal(cmd, u64); - if (!json_to_u64(membuf_elems(&conn->mb), id, cmd->id)) - plugin_err("JSON id '%*.s' is not a number", - id->end - id->start, - membuf_elems(&conn->mb) + id->start); - } else - cmd->id = NULL; - cmd->usage_only = false; - cmd->methodname = json_strdup(cmd, membuf_elems(&conn->mb), method); - - return cmd; -} - /* This starts a JSON RPC message with boilerplate */ static struct json_out *start_json_rpc(const tal_t *ctx, u64 id) { @@ -494,10 +479,10 @@ send_outreq_(struct command *cmd, } static struct command_result * -handle_getmanifest(struct command *getmanifest_cmd, - struct plugin *p) +handle_getmanifest(struct command *getmanifest_cmd) { struct json_out *params = json_out_new(tmpctx); + struct plugin *p = getmanifest_cmd->plugin; json_out_start(params, NULL, '{'); json_out_start(params, "options", '['); @@ -542,18 +527,16 @@ handle_getmanifest(struct command *getmanifest_cmd, return command_success(getmanifest_cmd, params); } -static struct command_result *handle_init(struct command *init_cmd, +static struct command_result *handle_init(struct command *cmd, const char *buf, - const jsmntok_t *params, - const struct plugin_option *opts, - void (*init)(struct plugin_conn *, - const char *buf, const jsmntok_t *)) + const jsmntok_t *params) { const jsmntok_t *configtok, *rpctok, *dirtok, *opttok, *nettok, *t; struct sockaddr_un addr; size_t i; char *dir, *network; struct json_out *param_obj; + struct plugin *p = cmd->plugin; configtok = json_delve(buf, params, ".configuration"); @@ -591,24 +574,24 @@ static struct command_result *handle_init(struct command *init_cmd, opttok = json_get_member(buf, params, "options"); json_for_each_obj(i, t, opttok) { char *opt = json_strdup(NULL, buf, t); - for (size_t i = 0; i < tal_count(opts); i++) { + for (size_t i = 0; i < tal_count(p->opts); i++) { char *problem; - if (!streq(opts[i].name, opt)) + if (!streq(p->opts[i].name, opt)) continue; - problem = opts[i].handle(json_strdup(opt, buf, t+1), - opts[i].arg); + problem = p->opts[i].handle(json_strdup(opt, buf, t+1), + p->opts[i].arg); if (problem) plugin_err("option '%s': %s", - opts[i].name, problem); + p->opts[i].name, problem); break; } tal_free(opt); } - if (init) - init(&rpc_conn, buf, configtok); + if (p->init) + p->init(&rpc_conn, buf, configtok); - return command_success_str(init_cmd, NULL); + return command_success_str(cmd, NULL); } char *u64_option(const char *arg, u64 *i) @@ -631,47 +614,6 @@ char *charp_option(const char *arg, char **p) return NULL; } -static void handle_new_command(const tal_t *ctx, - struct plugin_conn *request_conn, - struct plugin_conn *rpc_conn, - struct plugin *p) -{ - struct command *cmd; - const jsmntok_t *params; - int reqlen; - - cmd = read_json_request(ctx, request_conn, rpc_conn, ¶ms, &reqlen); - /* If this is a notification. */ - if (!cmd->id) { - for (size_t i = 0; i < p->num_notif_subs; i++) { - if (streq(cmd->methodname, p->notif_subs[i].name)) { - p->notif_subs[i].handle(cmd, membuf_elems(&request_conn->mb), - params); - membuf_consume(&request_conn->mb, reqlen); - } - } - return; - } - for (size_t i = 0; i < p->num_hook_subs; i++) { - if (streq(cmd->methodname, p->hook_subs[i].name)) { - p->hook_subs[i].handle(cmd, membuf_elems(&request_conn->mb), - params); - membuf_consume(&request_conn->mb, reqlen); - return; - } - } - for (size_t i = 0; i < p->num_commands; i++) { - if (streq(cmd->methodname, p->commands[i].name)) { - p->commands[i].handle(cmd, membuf_elems(&request_conn->mb), - params); - membuf_consume(&request_conn->mb, reqlen); - return; - } - } - - plugin_err("Unknown command '%s'", cmd->methodname); -} - static void setup_command_usage(const struct plugin_command *commands, size_t num_commands) { @@ -757,7 +699,215 @@ void plugin_log(enum log_level l, const char *fmt, ...) va_end(ap); } +static void ld_command_handle(struct plugin *plugin, + struct command *cmd, + const jsmntok_t *toks) +{ + const jsmntok_t *idtok, *methtok, *paramstok; + + idtok = json_get_member(plugin->buffer, toks, "id"); + methtok = json_get_member(plugin->buffer, toks, "method"); + paramstok = json_get_member(plugin->buffer, toks, "params"); + + if (!methtok || !paramstok) + plugin_err("Malformed JSON-RPC notification missing " + "\"method\" or \"params\": %.*s", + json_tok_full_len(toks), + json_tok_full(plugin->buffer, toks)); + + cmd->plugin = plugin; + cmd->id = NULL; + cmd->usage_only = false; + cmd->methodname = json_strdup(cmd, plugin->buffer, methtok); + if (idtok) { + cmd->id = tal(cmd, u64); + if (!json_to_u64(plugin->buffer, idtok, cmd->id)) + plugin_err("JSON id '%*.s' is not a number", + json_tok_full_len(idtok), + json_tok_full(plugin->buffer, idtok)); + } + + if (!plugin->manifested) { + if (streq(cmd->methodname, "getmanifest")) { + handle_getmanifest(cmd); + plugin->manifested = true; + return; + } + plugin_err("Did not receive 'getmanifest' yet, but got '%s'" + " instead", cmd->methodname); + } + + if (!plugin->initialized) { + if (streq(cmd->methodname, "init")) { + handle_init(cmd, plugin->buffer, paramstok); + plugin->initialized = true; + return; + } + plugin_err("Did not receive 'init' yet, but got '%s'" + " instead", cmd->methodname); + } + + /* If that's a notification. */ + if (!cmd->id) { + for (size_t i = 0; i < plugin->num_notif_subs; i++) { + if (streq(cmd->methodname, + plugin->notif_subs[i].name)) { + plugin->notif_subs[i].handle(cmd, + plugin->buffer, + paramstok); + return; + } + } + plugin_err("Unregistered notification %.*s", + json_tok_full_len(methtok), + json_tok_full(plugin->buffer, methtok)); + } + + for (size_t i = 0; i < plugin->num_hook_subs; i++) { + if (streq(cmd->methodname, plugin->hook_subs[i].name)) { + plugin->hook_subs[i].handle(cmd, + plugin->buffer, + paramstok); + return; + } + } + + for (size_t i = 0; i < plugin->num_commands; i++) { + if (streq(cmd->methodname, plugin->commands[i].name)) { + plugin->commands[i].handle(cmd, + plugin->buffer, + paramstok); + return; + } + } + + plugin_err("Unknown command '%s'", cmd->methodname); +} + +/** + * Try to parse a complete message from lightningd's buffer, and return true + * if we could handle it. + */ +static bool ld_read_json_one(struct plugin *plugin) +{ + bool valid; + const jsmntok_t *toks, *jrtok; + struct command *cmd = tal(plugin, struct command); + + /* FIXME: This could be done more efficiently by storing the + * toks and doing an incremental parse, like lightning-cli + * does. */ + toks = json_parse_input(NULL, plugin->buffer, plugin->used, + &valid); + if (!toks) { + if (!valid) { + plugin_err("Failed to parse JSON response '%.*s'", + (int)plugin->used, plugin->buffer); + return false; + } + /* We need more. */ + return false; + } + + /* Empty buffer? (eg. just whitespace). */ + if (tal_count(toks) == 1) { + plugin->used = 0; + return false; + } + + jrtok = json_get_member(plugin->buffer, toks, "jsonrpc"); + if (!jrtok) { + plugin_err("JSON-RPC message does not contain \"jsonrpc\" field"); + return false; + } + + ld_command_handle(plugin, cmd, toks); + + /* Move this object out of the buffer */ + memmove(plugin->buffer, plugin->buffer + toks[0].end, + tal_count(plugin->buffer) - toks[0].end); + plugin->used -= toks[0].end; + tal_free(toks); + + return true; +} + +static struct io_plan *ld_read_json(struct io_conn *conn, + struct plugin *plugin) +{ + plugin->used += plugin->len_read; + if (plugin->used && plugin->used == tal_count(plugin->buffer)) + tal_resize(&plugin->buffer, plugin->used * 2); + + /* Read and process all messages from the connection */ + while (ld_read_json_one(plugin)) + ; + + /* Now read more from the connection */ + return io_read_partial(plugin->stdin_conn, + plugin->buffer + plugin->used, + tal_count(plugin->buffer) - plugin->used, + &plugin->len_read, ld_read_json, plugin); +} + +static struct io_plan *ld_write_json(struct io_conn *conn, + struct plugin *plugin); + +static struct io_plan * +ld_stream_complete(struct io_conn *conn, struct json_stream *js, + struct plugin *plugin) +{ + assert(tal_count(plugin->js_arr) > 0); + /* Remove js and shift all remainig over */ + tal_arr_remove(&plugin->js_arr, 0); + + /* It got dropped off the queue, free it. */ + tal_free(js); + + return ld_write_json(conn, plugin); +} + +static struct io_plan *ld_write_json(struct io_conn *conn, + struct plugin *plugin) +{ + if (tal_count(plugin->js_arr) > 0) + return json_stream_output(plugin->js_arr[0], plugin->stdout_conn, + ld_stream_complete, plugin); + + return io_out_wait(conn, plugin, ld_write_json, plugin); +} + +static void ld_conn_finish(struct io_conn *conn, struct plugin *plugin) +{ + /* Without one of the conns there is no reason to stay alive. That + * certainly means lightningd died, since there is no cleaner way + * to stop, return 0. */ + exit(0); +} + +/* lightningd writes on our stdin */ +static struct io_plan *stdin_conn_init(struct io_conn *conn, + struct plugin *plugin) +{ + plugin->stdin_conn = conn; + io_set_finish(conn, ld_conn_finish, plugin); + return io_read_partial(plugin->stdin_conn, plugin->buffer, + tal_bytelen(plugin->buffer), &plugin->len_read, + ld_read_json, plugin); +} + +/* lightningd reads from our stdout */ +static struct io_plan *stdout_conn_init(struct io_conn *conn, + struct plugin *plugin) +{ + plugin->stdout_conn = conn; + io_set_finish(conn, ld_conn_finish, plugin); + return io_wait(plugin->stdout_conn, plugin, ld_write_json, plugin); +} + static struct plugin *new_plugin(const tal_t *ctx, + void (*init)(struct plugin_conn *rpc, + const char *buf, const jsmntok_t *), const enum plugin_restartability restartability, const struct plugin_command *commands, size_t num_commands, @@ -770,7 +920,15 @@ static struct plugin *new_plugin(const tal_t *ctx, const char *optname; struct plugin *p = tal(ctx, struct plugin); + p->buffer = tal_arr(p, char, 64); + p->js_arr = tal_arr(p, struct json_stream *, 0); + p->used = 0; + p->len_read = 0; + + p->init = init; + p->manifested = p->initialized = false; p->restartability = restartability; + p->commands = commands; p->num_commands = num_commands; p->notif_subs = notif_subs; @@ -805,11 +963,6 @@ void plugin_main(char *argv[], ...) { struct plugin *plugin; - struct plugin_conn request_conn; - struct command *cmd; - const jsmntok_t *params; - int reqlen; - struct pollfd fds[2]; va_list ap; setup_locale(); @@ -822,7 +975,7 @@ void plugin_main(char *argv[], setup_command_usage(commands, num_commands); va_start(ap, num_hook_subs); - plugin = new_plugin(NULL, restartability, commands, num_commands, + plugin = new_plugin(NULL, init, restartability, commands, num_commands, notif_subs, num_notif_subs, hook_subs, num_hook_subs, ap); va_end(ap); @@ -831,73 +984,24 @@ void plugin_main(char *argv[], membuf_init(&rpc_conn.mb, tal_arr(plugin, char, READ_CHUNKSIZE), READ_CHUNKSIZE, membuf_tal_realloc); - request_conn.fd = STDIN_FILENO; - membuf_init(&request_conn.mb, - tal_arr(plugin, char, READ_CHUNKSIZE), READ_CHUNKSIZE, - membuf_tal_realloc); uintmap_init(&out_reqs); - cmd = read_json_request(tmpctx, &request_conn, NULL, - ¶ms, &reqlen); - if (!streq(cmd->methodname, "getmanifest")) - plugin_err("Expected getmanifest not %s", cmd->methodname); - - membuf_consume(&request_conn.mb, reqlen); - handle_getmanifest(cmd, plugin); - - cmd = read_json_request(tmpctx, &request_conn, &rpc_conn, - ¶ms, &reqlen); - if (!streq(cmd->methodname, "init")) - plugin_err("Expected init not %s", cmd->methodname); - - handle_init(cmd, membuf_elems(&request_conn.mb), - params, plugin->opts, init); - membuf_consume(&request_conn.mb, reqlen); - - /* Set up fds for poll. */ - fds[0].fd = STDIN_FILENO; - fds[0].events = POLLIN; - fds[1].fd = rpc_conn.fd; - fds[1].events = POLLIN; + io_new_conn(plugin, STDIN_FILENO, stdin_conn_init, plugin); + io_new_conn(plugin, STDOUT_FILENO, stdout_conn_init, plugin); for (;;) { - struct timer *expired; - struct timemono now, first; - int t; + struct timer *expired = NULL; clean_tmpctx(); - /* If we already have some input, process now. */ - if (membuf_num_elems(&request_conn.mb) != 0) { - handle_new_command(plugin, &request_conn, &rpc_conn, plugin); - continue; - } if (membuf_num_elems(&rpc_conn.mb) != 0) { handle_rpc_reply(&rpc_conn); continue; } - /* Handle any timeouts */ - now = time_mono(); - expired = timers_expire(&timers, now); - if (expired) { - call_plugin_timer(&rpc_conn, expired); - continue; - } - - /* If we have a pending timer, timeout then */ - if (timer_earliest(&timers, &first)) - t = time_to_msec(timemono_between(first, now)); - else - t = -1; - - /* Otherwise, we poll. */ - poll(fds, 2, t); - - if (fds[0].revents & POLLIN) - handle_new_command(plugin, &request_conn, &rpc_conn, plugin); - if (fds[1].revents & POLLIN) - handle_rpc_reply(&rpc_conn); + /* Will only exit if a timer has expired. */ + io_loop(&timers, &expired); + call_plugin_timer(&rpc_conn, expired); } tal_free(plugin);