diff --git a/plugins/bkpr/test/run-bkpr_db.c b/plugins/bkpr/test/run-bkpr_db.c index 70248b287..9cfccc539 100644 --- a/plugins/bkpr/test/run-bkpr_db.c +++ b/plugins/bkpr/test/run-bkpr_db.c @@ -292,7 +292,7 @@ int main(int argc, char *argv[]) bool ok = true; /* Dummy for migration hooks */ struct plugin *plugin = tal(NULL, struct plugin); - plugin->js_arr = tal_arr(plugin, struct json_stream *, 0); + list_head_init(&plugin->js_list); common_setup(argv[0]); diff --git a/plugins/bkpr/test/run-recorder.c b/plugins/bkpr/test/run-recorder.c index a90035187..b569eeeb7 100644 --- a/plugins/bkpr/test/run-recorder.c +++ b/plugins/bkpr/test/run-recorder.c @@ -1421,7 +1421,7 @@ int main(int argc, char *argv[]) bool ok = true; /* Dummy for migration hooks */ struct plugin *plugin = tal(NULL, struct plugin); - plugin->js_arr = tal_arr(plugin, struct json_stream *, 0); + list_head_init(&plugin->js_list); common_setup(argv[0]); diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 5506196fd..36d187798 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -32,6 +32,12 @@ struct rpc_conn { MEMBUF(char) mb; }; +/* We can have more than one of these pending at once. */ +struct jstream { + struct list_node list; + struct json_stream *js; +}; + struct plugin { /* lightningd interaction */ struct io_conn *stdin_conn; @@ -47,11 +53,11 @@ struct plugin { jsmntok_t *toks; /* To write to lightningd */ - struct json_stream **js_arr; + struct list_head js_list; /* Asynchronous RPC interaction */ struct io_conn *io_rpc_conn; - struct json_stream **rpc_js_arr; + struct list_head rpc_js_list; char *rpc_buffer; size_t rpc_used, rpc_len_read, rpc_read_offset; jsmn_parser rpc_parser; @@ -128,15 +134,17 @@ struct command_result *command_done(void) static void ld_send(struct plugin *plugin, struct json_stream *stream) { - tal_steal(plugin->js_arr, stream); - tal_arr_expand(&plugin->js_arr, stream); + struct jstream *jstr = tal(plugin, struct jstream); + jstr->js = tal_steal(jstr, stream); + list_add_tail(&plugin->js_list, &jstr->list); io_wake(plugin); } static void ld_rpc_send(struct plugin *plugin, struct json_stream *stream) { - tal_steal(plugin->rpc_js_arr, stream); - tal_arr_expand(&plugin->rpc_js_arr, stream); + struct jstream *jstr = tal(plugin, struct jstream); + jstr->js = tal_steal(jstr, stream); + list_add_tail(&plugin->rpc_js_list, &jstr->list); io_wake(plugin->io_rpc_conn); } @@ -928,12 +936,10 @@ static struct io_plan * rpc_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin) { - assert(tal_count(plugin->rpc_js_arr) > 0); - /* Remove js and shift all remaining over */ - tal_arr_remove(&plugin->rpc_js_arr, 0); - - /* It got dropped off the queue, free it. */ - tal_free(js); + struct jstream *jstr = list_pop(&plugin->rpc_js_list, struct jstream, list); + assert(jstr); + assert(jstr->js == js); + tal_free(jstr); return rpc_conn_write_request(conn, plugin); } @@ -941,8 +947,9 @@ rpc_stream_complete(struct io_conn *conn, struct json_stream *js, static struct io_plan *rpc_conn_write_request(struct io_conn *conn, struct plugin *plugin) { - if (tal_count(plugin->rpc_js_arr) > 0) - return json_stream_output(plugin->rpc_js_arr[0], conn, + struct jstream *jstr = list_top(&plugin->rpc_js_list, struct jstream, list); + if (jstr) + return json_stream_output(jstr->js, conn, rpc_stream_complete, plugin); return io_out_wait(conn, plugin->io_rpc_conn, @@ -1548,12 +1555,10 @@ static struct io_plan * ld_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin) { - assert(tal_count(plugin->js_arr) > 0); - /* Remove js and shift all remainig over */ - tal_arr_remove(&plugin->js_arr, 0); - - /* It got dropped off the queue, free it. */ - tal_free(js); + struct jstream *jstr = list_pop(&plugin->js_list, struct jstream, list); + assert(jstr); + assert(jstr->js == js); + tal_free(jstr); return ld_write_json(conn, plugin); } @@ -1561,8 +1566,9 @@ ld_stream_complete(struct io_conn *conn, struct json_stream *js, static struct io_plan *ld_write_json(struct io_conn *conn, struct plugin *plugin) { - if (tal_count(plugin->js_arr) > 0) - return json_stream_output(plugin->js_arr[0], plugin->stdout_conn, + struct jstream *jstr = list_top(&plugin->js_list, struct jstream, list); + if (jstr) + return json_stream_output(jstr->js, plugin->stdout_conn, ld_stream_complete, plugin); /* If we were simply flushing final output, stop now. */ @@ -1626,14 +1632,14 @@ static struct plugin *new_plugin(const tal_t *ctx, name[path_ext_off(name)] = '\0'; p->id = name; p->buffer = tal_arr(p, char, 64); - p->js_arr = tal_arr(p, struct json_stream *, 0); + list_head_init(&p->js_list); p->used = 0; p->len_read = 0; jsmn_init(&p->parser); p->toks = toks_alloc(p); /* Async RPC */ p->rpc_buffer = tal_arr(p, char, 64); - p->rpc_js_arr = tal_arr(p, struct json_stream *, 0); + list_head_init(&p->rpc_js_list); p->rpc_used = 0; p->rpc_read_offset = 0; p->rpc_len_read = 0;