lightningd/subd: support multiple fds sent at once in normal messages.

Rather than returning SUBD_NEED_FD, callback returns how many fds it needs.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2017-03-20 07:01:35 +10:30
parent 38bffc0f0c
commit 7a9df37ef3
5 changed files with 65 additions and 64 deletions

View File

@@ -47,8 +47,7 @@ struct subd_req {
bool (*replycb)(struct subd *, const u8 *, const int *, void *);
void *replycb_data;
size_t num_fds_read;
int *fds_in;
size_t num_reply_fds;
};
static void free_subd_req(struct subd_req *sr)
@@ -66,8 +65,7 @@ static void add_req(struct subd *sd, int type, size_t num_fds_in,
sr->reply_type = type + SUBD_REPLY_OFFSET;
sr->replycb = replycb;
sr->replycb_data = replycb_data;
sr->fds_in = num_fds_in ? tal_arr(sr, int, num_fds_in) : NULL;
sr->num_fds_read = 0;
sr->num_reply_fds = num_fds_in;
assert(strends(sd->msgname(sr->reply_type), "_REPLY"));
/* Keep in FIFO order: we sent in order, so replies will be too. */
@@ -185,26 +183,46 @@ static struct io_plan *sd_msg_reply(struct io_conn *conn, struct subd *sd,
{
int type = fromwire_peektype(sd->msg_in);
bool keep_open;
size_t i;
log_info(sd->log, "REPLY %s with %zu fds",
sd->msgname(type), tal_count(sr->fds_in));
/* Don't trust subd to set it blocking. */
for (i = 0; i < tal_count(sr->fds_in); i++)
set_blocking(sr->fds_in[i], true);
sd->msgname(type), tal_count(sd->fds_in));
/* If not stolen, we'll free this below. */
tal_steal(sr, sd->msg_in);
keep_open = sr->replycb(sd, sd->msg_in, sr->fds_in, sr->replycb_data);
keep_open = sr->replycb(sd, sd->msg_in, sd->fds_in, sr->replycb_data);
tal_free(sr);
if (!keep_open)
return io_close(conn);
/* Free any fd array. */
sd->fds_in = tal_free(sd->fds_in);
return io_read_wire(conn, sd, &sd->msg_in, sd_msg_read, sd);
}
static struct io_plan *read_fds(struct io_conn *conn, struct subd *sd)
{
if (sd->num_fds_in_read == tal_count(sd->fds_in)) {
size_t i;
/* Don't trust subd to set it blocking. */
for (i = 0; i < tal_count(sd->fds_in); i++)
set_blocking(sd->fds_in[i], true);
return sd_msg_read(conn, sd);
}
return io_recv_fd(conn, &sd->fds_in[sd->num_fds_in_read++],
read_fds, sd);
}
static struct io_plan *sd_collect_fds(struct io_conn *conn, struct subd *sd,
size_t num_fds)
{
assert(!sd->fds_in);
sd->fds_in = tal_arr(sd, int, num_fds);
sd->num_fds_in_read = 0;
return read_fds(conn, sd);
}
static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd)
{
int type = fromwire_peektype(sd->msg_in);
@@ -221,11 +239,10 @@ static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd)
/* First, check for replies. */
sr = get_req(sd, type);
if (sr) {
/* If we need (another) fd, read it and call us again. */
if (sr->num_fds_read < tal_count(sr->fds_in)) {
return io_recv_fd(conn, &sr->fds_in[sr->num_fds_read++],
sd_msg_read, sd);
}
if (sr->num_reply_fds && sd->fds_in == NULL)
return sd_collect_fds(conn, sd, sr->num_reply_fds);
assert(sr->num_reply_fds == tal_count(sd->fds_in));
return sd_msg_reply(conn, sd, sr);
}
@@ -244,30 +261,21 @@ static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd)
sd->msgname(type), str_len, str);
else {
log_info(sd->log, "UPDATE %s", sd->msgname(type));
if (sd->msgcb) {
enum subd_msg_ret r;
/* If received from subd, set blocking. */
if (sd->fd_in != -1)
set_blocking(sd->fd_in, true);
r = sd->msgcb(sd, sd->msg_in, sd->fd_in);
switch (r) {
case SUBD_NEED_FD:
if (sd->msgcb) {
size_t i = sd->msgcb(sd, sd->msg_in, sd->fds_in);
if (i != 0) {
/* Don't ask for fds twice! */
assert(!sd->fds_in);
/* Don't free msg_in: we go around again. */
tal_steal(sd, sd->msg_in);
tal_free(tmpctx);
return io_recv_fd(conn, &sd->fd_in,
sd_msg_read, sd);
case SUBD_COMPLETE:
break;
default:
fatal("Unknown msgcb return for %s:%s: %u",
sd->name, sd->msgname(type), r);
return sd_collect_fds(conn, sd, i);
}
}
}
sd->msg_in = NULL;
sd->fd_in = -1;
sd->fds_in = tal_free(sd->fds_in);
tal_free(tmpctx);
return io_read_wire(conn, sd, &sd->msg_in, sd_msg_read, sd);
}
@@ -321,8 +329,8 @@ struct subd *new_subd(const tal_t *ctx,
const char *name,
struct peer *peer,
const char *(*msgname)(int msgtype),
enum subd_msg_ret (*msgcb)
(struct subd *, const u8 *, int fd),
size_t (*msgcb)(struct subd *, const u8 *,
const int *fds),
void (*finished)(struct subd *, int),
...)
{
@@ -347,7 +355,7 @@ struct subd *new_subd(const tal_t *ctx,
sd->finished = finished;
sd->msgname = msgname;
sd->msgcb = msgcb;
sd->fd_in = -1;
sd->fds_in = NULL;
msg_queue_init(&sd->outq, sd);
tal_add_destructor(sd, destroy_subd);
list_head_init(&sd->reqs);