#include "config.h" #include /* To reach into io_plan: not a public header! */ #include #include #include static void adjust_io_write(struct json_out *jout, ptrdiff_t delta, struct json_stream *js) { /* If io_write is in progress, we shift it to point to new buffer pos */ if (js->reader) /* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */ js->reader->plan[IO_OUT].arg.u1.cp += delta; } struct json_stream *new_json_stream(const tal_t *ctx, struct command *writer, struct log *log) { struct json_stream *js = tal(ctx, struct json_stream); /* FIXME: Add magic so tal_resize can fail! */ js->jout = json_out_new(js); json_out_call_on_move(js->jout, adjust_io_write, js); js->writer = writer; js->reader = NULL; js->log = log; return js; } struct json_stream *json_stream_dup(const tal_t *ctx, struct json_stream *original, struct log *log) { struct json_stream *js = tal_dup(ctx, struct json_stream, original); js->jout = json_out_dup(js, original->jout); js->log = log; return js; } /** * json_stream_still_writing - is someone currently writing to this stream? * @js: the json_stream. * * Has this json_stream not been closed yet? */ static bool json_stream_still_writing(const struct json_stream *js) { return js->writer != NULL; } void json_stream_log_suppress(struct json_stream *js, const char *cmd_name) { /* Really shouldn't be used for anything else */ assert(streq(cmd_name, "getlog")); js->log = NULL; } void json_stream_append(struct json_stream *js, const char *str, size_t len) { char *dest; dest = json_out_direct(js->jout, len); memcpy(dest, str, len); } /* We promise it will end in '\n\n' */ void json_stream_double_cr(struct json_stream *js) { const char *contents; size_t len, cr_needed; /* Must be well-formed at this point! */ json_out_finished(js->jout); contents = json_out_contents(js->jout, &len); /* It's an object (with an id!): definitely can't be less that "{}" */ assert(len >= 2); if (contents[len-1] == '\n') { if (contents[len-2] == '\n') return; cr_needed = 1; } else cr_needed = 2; json_stream_append(js, "\n\n", cr_needed); } void json_stream_close(struct json_stream *js, struct command *writer) { /* FIXME: We use writer == NULL for malformed: make writer a void *? * I used to assert(writer); here. */ assert(js->writer == writer); /* Should be well-formed at this point! */ json_stream_double_cr(js); json_stream_flush(js); js->writer = NULL; } /* Also called when we're oom, so it will kill reader. */ void json_stream_flush(struct json_stream *js) { /* Wake the stream reader. FIXME: Could have a flag here to optimize */ io_wake(js); } char *json_member_direct(struct json_stream *js, const char *fieldname, size_t extra) { char *dest; dest = json_out_member_direct(js->jout, fieldname, extra); return dest; } void json_array_start(struct json_stream *js, const char *fieldname) { json_out_start(js->jout, fieldname, '['); } void json_array_end(struct json_stream *js) { json_out_end(js->jout, ']'); } void json_object_start(struct json_stream *js, const char *fieldname) { json_out_start(js->jout, fieldname, '{'); } void json_object_end(struct json_stream *js) { json_out_end(js->jout, '}'); } void json_add_member(struct json_stream *js, const char *fieldname, bool quote, const char *fmt, ...) { va_list ap; va_start(ap, fmt); json_out_addv(js->jout, fieldname, quote, fmt, ap); va_end(ap); } void json_add_jsonstr(struct json_stream *js, const char *fieldname, const char *jsonstr) { char *p; size_t len = strlen(jsonstr); p = json_member_direct(js, fieldname, len); memcpy(p, jsonstr, len); } /* This is where we read the json_stream and write it to conn */ static struct io_plan *json_stream_output_write(struct io_conn *conn, struct json_stream *js) { const char *p; /* For when we've just done some output */ json_out_consume(js->jout, js->len_read); /* Get how much we can write out from js */ p = json_out_contents(js->jout, &js->len_read); /* Nothing in buffer? */ if (!p) { /* We're not doing io_write now, unset. */ js->reader = NULL; if (!json_stream_still_writing(js)) return js->reader_cb(conn, js, js->reader_arg); return io_out_wait(conn, js, json_stream_output_write, js); } js->reader = conn; return io_write(conn, p, js->len_read, json_stream_output_write, js); } struct io_plan *json_stream_output_(struct json_stream *js, struct io_conn *conn, struct io_plan *(*cb)(struct io_conn *conn, struct json_stream *js, void *arg), void *arg) { assert(!js->reader); js->reader_cb = cb; js->reader_arg = arg; js->len_read = 0; return json_stream_output_write(conn, js); }