json_stream: disentangle JSON handling from command.

We promote 'struct json_stream' to contain the membuf; we only attach
the json_stream to the command when we actually call
json_stream_success / json_stream_fail.

This means we are closer to 'struct json_stream' being an independent
layer; the tests are already modified to use it directly to create
JSON.

This is also the first step toward re-enabling non-serial command
execution.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2018-11-20 12:16:32 +10:30
committed by Christian Decker
parent db3f0ba965
commit e17f69ce2d
12 changed files with 499 additions and 356 deletions

View File

@@ -6,7 +6,6 @@
#include <bitcoin/script.h>
#include <ccan/array_size/array_size.h>
#include <ccan/err/err.h>
#include <ccan/io/backend.h>
#include <ccan/io/io.h>
#include <ccan/str/hex/hex.h>
#include <ccan/tal/str/str.h>
@@ -32,16 +31,6 @@
#include <sys/types.h>
#include <sys/un.h>
/* Realloc helper for tal membufs */
static void *membuf_tal_realloc(struct membuf *mb,
void *rawelems, size_t newsize)
{
char *p = rawelems;
tal_resize(&p, newsize);
return p;
}
/* jcon and cmd have separate lifetimes: we detach them on either destruction */
static void destroy_jcon(struct json_connection *jcon)
{
@@ -54,12 +43,6 @@ static void destroy_jcon(struct json_connection *jcon)
tal_free(jcon->log);
}
/* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */
static void adjust_io_write(struct io_conn *conn, ptrdiff_t delta)
{
conn->plan[IO_OUT].arg.u1.cp += delta;
}
static void json_help(struct command *cmd,
const char *buffer, const jsmntok_t *params);
@@ -283,57 +266,6 @@ static const struct json_command *find_cmd(const char *buffer,
return NULL;
}
/* Make sure jcon->outbuf has room for len */
static void json_connection_mkroom(struct json_connection *jcon, size_t len)
{
ptrdiff_t delta = membuf_prepare_space(&jcon->outbuf, len);
/* If io_write is in progress, we shift it to point to new buffer pos */
if (io_lock_taken(jcon->lock))
adjust_io_write(jcon->conn, delta);
}
void jcon_append(struct json_connection *jcon, const char *str)
{
size_t len = strlen(str);
json_connection_mkroom(jcon, len);
memcpy(membuf_add(&jcon->outbuf, len), str, len);
/* Wake writer. */
io_wake(jcon);
}
void jcon_append_vfmt(struct json_connection *jcon, const char *fmt, va_list ap)
{
size_t fmtlen;
va_list ap2;
/* Make a copy in case we need it below. */
va_copy(ap2, ap);
/* Try printing in place first. */
fmtlen = vsnprintf(membuf_space(&jcon->outbuf),
membuf_num_space(&jcon->outbuf), fmt, ap);
/* Horrible subtlety: vsnprintf *will* NUL terminate, even if it means
* chopping off the last character. So if fmtlen ==
* membuf_num_space(&jcon->outbuf), the result was truncated! */
if (fmtlen < membuf_num_space(&jcon->outbuf)) {
membuf_added(&jcon->outbuf, fmtlen);
} else {
/* Make room for NUL terminator, even though we don't want it */
json_connection_mkroom(jcon, fmtlen + 1);
vsprintf(membuf_space(&jcon->outbuf), fmt, ap2);
membuf_added(&jcon->outbuf, fmtlen);
}
va_end(ap2);
/* Wake writer. */
io_wake(jcon);
}
struct json_stream *null_response(struct command *cmd)
{
struct json_stream *response;
@@ -357,27 +289,34 @@ static void destroy_command(struct command *cmd)
cmd->jcon->command = NULL;
}
/* FIXME: Remove result arg here! */
void command_success(struct command *cmd, struct json_stream *result)
{
assert(cmd);
assert(cmd->have_json_stream);
if (cmd->jcon)
jcon_append(cmd->jcon, " }\n\n");
json_stream_append(result, " }\n\n");
json_stream_close(result, cmd);
if (cmd->ok)
*(cmd->ok) = true;
/* If we have a jcon, it will free result for us. */
if (cmd->jcon)
tal_steal(cmd->jcon, result);
tal_free(cmd);
}
/* FIXME: Remove result arg here! */
void command_failed(struct command *cmd, struct json_stream *result)
{
assert(cmd->have_json_stream);
/* Have to close error */
if (cmd->jcon)
jcon_append(cmd->jcon, " } }\n\n");
json_stream_append(result, " } }\n\n");
json_stream_close(result, cmd);
if (cmd->ok)
*(cmd->ok) = false;
/* If we have a jcon, it will free result for us. */
if (cmd->jcon)
tal_steal(cmd->jcon, result);
tal_free(cmd);
}
@@ -403,23 +342,23 @@ void command_still_pending(struct command *cmd)
cmd->pending = true;
}
static void jcon_start(struct json_connection *jcon, const char *id)
{
jcon_append(jcon, "{ \"jsonrpc\": \"2.0\", \"id\" : ");
jcon_append(jcon, id);
jcon_append(jcon, ", ");
}
static void json_command_malformed(struct json_connection *jcon,
const char *id,
const char *error)
{
jcon_start(jcon, id);
jcon_append(jcon,
tal_fmt(tmpctx, " \"error\" : "
"{ \"code\" : %d,"
" \"message\" : \"%s\" } }\n\n",
JSONRPC2_INVALID_REQUEST, error));
/* FIXME: We only allow one command at a time */
assert(!jcon->js);
jcon->js = new_json_stream(jcon, NULL);
io_wake(jcon);
json_stream_append_fmt(jcon->js,
"{ \"jsonrpc\": \"2.0\", \"id\" : %s,"
" \"error\" : "
"{ \"code\" : %d,"
" \"message\" : \"%s\" } }\n\n",
id, JSONRPC2_INVALID_REQUEST, error);
json_stream_close(jcon->js, NULL);
}
/* Returns true if command already completed. */
@@ -463,9 +402,6 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
jcon->command = c;
tal_add_destructor(c, destroy_command);
/* Write start of response: rest will be appended directly. */
jcon_start(jcon, c->id);
if (!method || !params) {
command_fail(c, JSONRPC2_INVALID_REQUEST,
method ? "No params" : "No method");
@@ -506,19 +442,28 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
}
/* Mutual recursion */
static struct io_plan *locked_write_json(struct io_conn *conn,
struct json_connection *jcon);
static struct io_plan *write_json(struct io_conn *conn,
struct json_connection *jcon);
static struct io_plan *stream_out_complete(struct io_conn *conn,
struct json_connection *jcon);
static struct io_plan *write_json_done(struct io_conn *conn,
struct json_connection *jcon)
static struct io_plan *start_json_stream(struct io_conn *conn,
struct json_connection *jcon)
{
membuf_consume(&jcon->outbuf, jcon->out_amount);
/* If something has created an output buffer, start streaming. */
if (jcon->js)
return json_stream_output(jcon->js, conn,
stream_out_complete, jcon);
/* If we have more to write, do it now. */
if (membuf_num_elems(&jcon->outbuf))
return write_json(conn, jcon);
/* Wait for attach_json_stream */
return io_out_wait(conn, jcon, start_json_stream, jcon);
}
/* Command has completed writing, and we've written it all out to conn. */
static struct io_plan *stream_out_complete(struct io_conn *conn,
struct json_connection *jcon)
{
/* Free up the json_stream for next command. */
assert(jcon->js);
jcon->js = tal_free(jcon->js);
if (jcon->stop) {
log_unusual(jcon->log, "JSON-RPC shutdown");
@@ -527,29 +472,11 @@ static struct io_plan *write_json_done(struct io_conn *conn,
return io_close(conn);
}
/* If command is done and we've output everything, wake read_json
* for next command. */
if (!jcon->command)
io_wake(conn);
/* Tell reader to run next command. */
io_wake(conn);
io_lock_release(jcon->lock);
/* Wait for more output. */
return io_out_wait(conn, jcon, locked_write_json, jcon);
}
static struct io_plan *write_json(struct io_conn *conn,
struct json_connection *jcon)
{
jcon->out_amount = membuf_num_elems(&jcon->outbuf);
return io_write(conn,
membuf_elems(&jcon->outbuf), jcon->out_amount,
write_json_done, jcon);
}
static struct io_plan *locked_write_json(struct io_conn *conn,
struct json_connection *jcon)
{
return io_lock_acquire_out(conn, jcon->lock, write_json, jcon);
return start_json_stream(conn, jcon);
}
static struct io_plan *read_json(struct io_conn *conn,
@@ -567,6 +494,12 @@ static struct io_plan *read_json(struct io_conn *conn,
if (jcon->used == tal_count(jcon->buffer))
tal_resize(&jcon->buffer, jcon->used * 2);
/* We wait for pending output to be consumed, to avoid DoS */
if (jcon->js) {
jcon->len_read = 0;
return io_wait(conn, conn, read_json, jcon);
}
toks = json_parse_input(jcon->buffer, jcon->used, &valid);
if (!toks) {
if (!valid) {
@@ -629,9 +562,7 @@ static struct io_plan *jcon_connected(struct io_conn *conn,
jcon->used = 0;
jcon->buffer = tal_arr(jcon, char, 64);
jcon->stop = false;
jcon->lock = io_lock_new(jcon);
membuf_init(&jcon->outbuf,
tal_arr(jcon, char, 64), 64, membuf_tal_realloc);
jcon->js = NULL;
jcon->len_read = 0;
jcon->command = NULL;
@@ -650,7 +581,7 @@ static struct io_plan *jcon_connected(struct io_conn *conn,
* pattern would break down, so we use this trick. */
return io_duplex(conn,
read_json(conn, jcon),
io_out_wait(conn, jcon, locked_write_json, jcon));
start_json_stream(conn, jcon));
}
static struct io_plan *incoming_jcon_connected(struct io_conn *conn,