libplugin: optimize parsing lightningd rpc responses.

autoclean was using 98% of its time in memmove; we should simply keep
an offset, and memmove when it's empty.  And also, only memmove the
used region, not the entire buffer!

Running on product of giantnodes.py:

	$ time l1-cli autoclean-once failedpays 1
	{
	   "autoclean": {
	      "failedpays": {
	         "cleaned": 26895,
	         "uncleaned": 1000000
	      }
	   }
	}

Before:
	real	20m46.579s
	user	0m0.000s
	sys	0m0.001s
	
After:
	real	2m10.568s
	user	0m0.001s
	sys	0m0.000s

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-09-19 10:23:00 +09:30
committed by Christian Decker
parent 540a6e4b99
commit 4d8c321517

View File

@@ -53,7 +53,7 @@ struct plugin {
struct io_conn *io_rpc_conn;
struct json_stream **rpc_js_arr;
char *rpc_buffer;
size_t rpc_used, rpc_len_read;
size_t rpc_used, rpc_len_read, rpc_read_offset;
jsmn_parser rpc_parser;
jsmntok_t *rpc_toks;
/* Tracking async RPC requests */
@@ -672,20 +672,21 @@ static void handle_rpc_reply(struct plugin *plugin, const jsmntok_t *toks)
const jsmntok_t *idtok, *contenttok;
struct out_req *out;
struct command_result *res;
const char *buf = plugin->rpc_buffer + plugin->rpc_read_offset;
idtok = json_get_member(plugin->rpc_buffer, toks, "id");
idtok = json_get_member(buf, toks, "id");
if (!idtok)
/* FIXME: Don't simply ignore notifications! */
return;
out = strmap_getn(&plugin->out_reqs,
plugin->rpc_buffer + idtok->start,
buf + idtok->start,
idtok->end - idtok->start);
if (!out) {
/* This can actually happen, if they free req! */
plugin_log(plugin, LOG_DBG, "JSON reply with unknown id '%.*s'",
json_tok_full_len(toks),
json_tok_full(plugin->rpc_buffer, toks));
json_tok_full(buf, toks));
return;
}
@@ -696,27 +697,23 @@ static void handle_rpc_reply(struct plugin *plugin, const jsmntok_t *toks)
/* We want to free this if callback doesn't. */
tal_steal(tmpctx, out);
contenttok = json_get_member(plugin->rpc_buffer, toks, "error");
contenttok = json_get_member(buf, toks, "error");
if (contenttok) {
if (out->errcb)
res = out->errcb(out->cmd, plugin->rpc_buffer,
contenttok, out->arg);
res = out->errcb(out->cmd, buf, contenttok, out->arg);
else
res = out->cb(out->cmd, plugin->rpc_buffer,
toks, out->arg);
res = out->cb(out->cmd, buf, toks, out->arg);
} else {
contenttok = json_get_member(plugin->rpc_buffer, toks, "result");
contenttok = json_get_member(buf, toks, "result");
if (!contenttok)
plugin_err(plugin, "Bad JSONRPC, no 'error' nor 'result': '%.*s'",
json_tok_full_len(toks),
json_tok_full(plugin->rpc_buffer, toks));
json_tok_full(buf, toks));
/* errcb is NULL if it's a single whole-object callback */
if (out->errcb)
res = out->cb(out->cmd, plugin->rpc_buffer, contenttok,
out->arg);
res = out->cb(out->cmd, buf, contenttok, out->arg);
else
res = out->cb(out->cmd, plugin->rpc_buffer, toks,
out->arg);
res = out->cb(out->cmd, buf, toks, out->arg);
}
assert(res == &pending || res == &complete);
@@ -861,42 +858,48 @@ static bool rpc_read_response_one(struct plugin *plugin)
bool complete;
if (!json_parse_input(&plugin->rpc_parser, &plugin->rpc_toks,
plugin->rpc_buffer, plugin->rpc_used, &complete)) {
plugin->rpc_buffer + plugin->rpc_read_offset,
plugin->rpc_used - plugin->rpc_read_offset,
&complete)) {
plugin_err(plugin, "Failed to parse RPC JSON response '%.*s'",
(int)plugin->rpc_used, plugin->rpc_buffer);
return false;
(int)(plugin->rpc_used - plugin->rpc_read_offset),
plugin->rpc_buffer + plugin->rpc_read_offset);
}
if (!complete) {
/* We need more. */
return false;
goto compact;
}
/* Empty buffer? (eg. just whitespace). */
if (tal_count(plugin->rpc_toks) == 1) {
plugin->rpc_used = 0;
jsmn_init(&plugin->rpc_parser);
toks_reset(plugin->rpc_toks);
return false;
goto compact;
}
jrtok = json_get_member(plugin->rpc_buffer, plugin->rpc_toks, "jsonrpc");
jrtok = json_get_member(plugin->rpc_buffer + plugin->rpc_read_offset,
plugin->rpc_toks, "jsonrpc");
if (!jrtok) {
plugin_err(plugin, "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'",
(int)plugin->rpc_used, plugin->rpc_buffer);
return false;
(int)(plugin->rpc_used - plugin->rpc_read_offset),
plugin->rpc_buffer + plugin->rpc_read_offset);
}
handle_rpc_reply(plugin, plugin->rpc_toks);
/* Move this object out of the buffer */
memmove(plugin->rpc_buffer, plugin->rpc_buffer + plugin->rpc_toks[0].end,
tal_count(plugin->rpc_buffer) - plugin->rpc_toks[0].end);
plugin->rpc_used -= plugin->rpc_toks[0].end;
plugin->rpc_read_offset += plugin->rpc_toks[0].end;
jsmn_init(&plugin->rpc_parser);
toks_reset(plugin->rpc_toks);
return true;
compact:
memmove(plugin->rpc_buffer, plugin->rpc_buffer + plugin->rpc_read_offset,
plugin->rpc_used - plugin->rpc_read_offset);
plugin->rpc_used -= plugin->rpc_read_offset;
plugin->rpc_read_offset = 0;
return false;
}
static struct io_plan *rpc_conn_read_response(struct io_conn *conn,
@@ -1632,6 +1635,7 @@ static struct plugin *new_plugin(const tal_t *ctx,
p->rpc_buffer = tal_arr(p, char, 64);
p->rpc_js_arr = tal_arr(p, struct json_stream *, 0);
p->rpc_used = 0;
p->rpc_read_offset = 0;
p->rpc_len_read = 0;
jsmn_init(&p->rpc_parser);
p->rpc_toks = toks_alloc(p);