mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-19 07:04:22 +01:00
common: move json_stream from lightningd/ to common/
It's not lightningd-specific and we are going to need it for libplugin. The only drawback is the log_io removal in json_stream_output_write()..
This commit is contained in:
228
common/json_stream.c
Normal file
228
common/json_stream.c
Normal file
@@ -0,0 +1,228 @@
|
||||
#include <ccan/io/io.h>
|
||||
/* To reach into io_plan: not a public header! */
|
||||
#include <ccan/io/backend.h>
|
||||
#include <ccan/json_escape/json_escape.h>
|
||||
#include <ccan/json_out/json_out.h>
|
||||
#include <ccan/str/hex/hex.h>
|
||||
#include <ccan/tal/str/str.h>
|
||||
#include <common/daemon.h>
|
||||
#include <common/json_stream.h>
|
||||
#include <common/utils.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdio.h>
|
||||
|
||||
struct json_stream {
|
||||
/* NULL if we ran OOM! */
|
||||
struct json_out *jout;
|
||||
|
||||
/* Who is writing to this buffer now; NULL if nobody is. */
|
||||
struct command *writer;
|
||||
|
||||
/* Who is io_writing from this buffer now: NULL if nobody is. */
|
||||
struct io_conn *reader;
|
||||
struct io_plan *(*reader_cb)(struct io_conn *conn,
|
||||
struct json_stream *js,
|
||||
void *arg);
|
||||
void *reader_arg;
|
||||
size_t len_read;
|
||||
|
||||
/* Where to log I/O */
|
||||
struct log *log;
|
||||
};
|
||||
|
||||
static void adjust_io_write(struct json_out *jout,
|
||||
ptrdiff_t delta,
|
||||
struct json_stream *js)
|
||||
{
|
||||
/* If io_write is in progress, we shift it to point to new buffer pos */
|
||||
if (js->reader)
|
||||
/* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */
|
||||
js->reader->plan[IO_OUT].arg.u1.cp += delta;
|
||||
}
|
||||
|
||||
struct json_stream *new_json_stream(const tal_t *ctx,
|
||||
struct command *writer,
|
||||
struct log *log)
|
||||
{
|
||||
struct json_stream *js = tal(ctx, struct json_stream);
|
||||
|
||||
/* FIXME: Add magic so tal_resize can fail! */
|
||||
js->jout = json_out_new(js);
|
||||
json_out_call_on_move(js->jout, adjust_io_write, js);
|
||||
js->writer = writer;
|
||||
js->reader = NULL;
|
||||
js->log = log;
|
||||
return js;
|
||||
}
|
||||
|
||||
struct json_stream *json_stream_dup(const tal_t *ctx,
|
||||
struct json_stream *original,
|
||||
struct log *log)
|
||||
{
|
||||
struct json_stream *js = tal_dup(ctx, struct json_stream, original);
|
||||
|
||||
if (original->jout)
|
||||
js->jout = json_out_dup(js, original->jout);
|
||||
js->log = log;
|
||||
return js;
|
||||
}
|
||||
|
||||
bool json_stream_still_writing(const struct json_stream *js)
|
||||
{
|
||||
return js->writer != NULL;
|
||||
}
|
||||
|
||||
void json_stream_log_suppress(struct json_stream *js, const char *cmd_name)
|
||||
{
|
||||
/* Really shouldn't be used for anything else */
|
||||
assert(streq(cmd_name, "getlog"));
|
||||
js->log = NULL;
|
||||
}
|
||||
|
||||
/* If we have an allocation failure. */
|
||||
static void COLD js_oom(struct json_stream *js)
|
||||
{
|
||||
js->jout = tal_free(js->jout);
|
||||
}
|
||||
|
||||
void json_stream_append(struct json_stream *js,
|
||||
const char *str, size_t len)
|
||||
{
|
||||
char *dest;
|
||||
|
||||
if (!js->jout)
|
||||
return;
|
||||
dest = json_out_direct(js->jout, len);
|
||||
if (!dest) {
|
||||
js_oom(js);
|
||||
return;
|
||||
}
|
||||
memcpy(dest, str, len);
|
||||
}
|
||||
|
||||
void json_stream_close(struct json_stream *js, struct command *writer)
|
||||
{
|
||||
/* FIXME: We use writer == NULL for malformed: make writer a void *?
|
||||
* I used to assert(writer); here. */
|
||||
assert(js->writer == writer);
|
||||
|
||||
/* Should be well-formed at this point! */
|
||||
json_out_finished(js->jout);
|
||||
json_stream_append(js, "\n\n", strlen("\n\n"));
|
||||
json_stream_flush(js);
|
||||
js->writer = NULL;
|
||||
}
|
||||
|
||||
/* Also called when we're oom, so it will kill reader. */
|
||||
void json_stream_flush(struct json_stream *js)
|
||||
{
|
||||
/* Wake the stream reader. FIXME: Could have a flag here to optimize */
|
||||
io_wake(js);
|
||||
}
|
||||
|
||||
char *json_member_direct(struct json_stream *js,
|
||||
const char *fieldname, size_t extra)
|
||||
{
|
||||
char *dest;
|
||||
|
||||
if (!js->jout)
|
||||
return NULL;
|
||||
|
||||
dest = json_out_member_direct(js->jout, fieldname, extra);
|
||||
if (!dest)
|
||||
js_oom(js);
|
||||
return dest;
|
||||
}
|
||||
|
||||
void json_array_start(struct json_stream *js, const char *fieldname)
|
||||
{
|
||||
if (js->jout && !json_out_start(js->jout, fieldname, '['))
|
||||
js_oom(js);
|
||||
}
|
||||
|
||||
void json_array_end(struct json_stream *js)
|
||||
{
|
||||
if (js->jout && !json_out_end(js->jout, ']'))
|
||||
js_oom(js);
|
||||
}
|
||||
|
||||
void json_object_start(struct json_stream *js, const char *fieldname)
|
||||
{
|
||||
if (js->jout && !json_out_start(js->jout, fieldname, '{'))
|
||||
js_oom(js);
|
||||
}
|
||||
|
||||
void json_object_end(struct json_stream *js)
|
||||
{
|
||||
if (js->jout && !json_out_end(js->jout, '}'))
|
||||
js_oom(js);
|
||||
}
|
||||
|
||||
void json_object_compat_end(struct json_stream *js)
|
||||
{
|
||||
/* In 0.7.1 we upgraded pylightning to no longer need this. */
|
||||
#ifdef COMPAT_V070
|
||||
json_stream_append(js, " ", 1);
|
||||
#endif
|
||||
json_object_end(js);
|
||||
}
|
||||
|
||||
void json_add_member(struct json_stream *js,
|
||||
const char *fieldname,
|
||||
bool quote,
|
||||
const char *fmt, ...)
|
||||
{
|
||||
va_list ap;
|
||||
|
||||
va_start(ap, fmt);
|
||||
if (js->jout && !json_out_addv(js->jout, fieldname, quote, fmt, ap))
|
||||
js_oom(js);
|
||||
va_end(ap);
|
||||
}
|
||||
|
||||
/* This is where we read the json_stream and write it to conn */
|
||||
static struct io_plan *json_stream_output_write(struct io_conn *conn,
|
||||
struct json_stream *js)
|
||||
{
|
||||
const char *p;
|
||||
|
||||
/* Out of memory? Nothing we can do but close conn */
|
||||
if (!js->jout)
|
||||
return io_close(conn);
|
||||
|
||||
/* For when we've just done some output */
|
||||
json_out_consume(js->jout, js->len_read);
|
||||
|
||||
/* Get how much we can write out from js */
|
||||
p = json_out_contents(js->jout, &js->len_read);
|
||||
|
||||
/* Nothing in buffer? */
|
||||
if (!p) {
|
||||
/* We're not doing io_write now, unset. */
|
||||
js->reader = NULL;
|
||||
if (!json_stream_still_writing(js))
|
||||
return js->reader_cb(conn, js, js->reader_arg);
|
||||
return io_out_wait(conn, js, json_stream_output_write, js);
|
||||
}
|
||||
|
||||
js->reader = conn;
|
||||
return io_write(conn,
|
||||
p, js->len_read,
|
||||
json_stream_output_write, js);
|
||||
}
|
||||
|
||||
struct io_plan *json_stream_output_(struct json_stream *js,
|
||||
struct io_conn *conn,
|
||||
struct io_plan *(*cb)(struct io_conn *conn,
|
||||
struct json_stream *js,
|
||||
void *arg),
|
||||
void *arg)
|
||||
{
|
||||
assert(!js->reader);
|
||||
|
||||
js->reader_cb = cb;
|
||||
js->reader_arg = arg;
|
||||
|
||||
js->len_read = 0;
|
||||
return json_stream_output_write(conn, js);
|
||||
}
|
||||
Reference in New Issue
Block a user