diff --git a/connectd/connectd.c b/connectd/connectd.c index 367253f03..dc0c2f798 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -307,13 +307,12 @@ static struct peer *new_peer(struct daemon *daemon, peer->id = *id; peer->cs = *cs; peer->final_msg = NULL; - peer->subd_in = NULL; + peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; peer->sent_to_peer = NULL; peer->urgent = false; peer->told_to_close = false; peer->peer_outq = msg_queue_new(peer, false); - peer->subd_outq = msg_queue_new(peer, false); #if DEVELOPER peer->dev_writes_enabled = NULL; @@ -323,7 +322,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->to_peer = conn; /* Aim for connection to shuffle data back and forth: sets up - * peer->to_subd */ + * peer->subds[0] */ if (!multiplex_subd_setup(peer, fd_for_subd)) return tal_free(peer); @@ -1792,7 +1791,7 @@ static void try_connect_peer(struct daemon *daemon, existing = peer_htable_get(&daemon->peers, id); if (existing) { /* If it's exiting now, we've raced: reconnect after */ - if (existing->to_subd + if (tal_count(existing->subds) != 0 && existing->to_peer && !existing->told_to_close) return; @@ -1892,7 +1891,7 @@ void peer_conn_closed(struct peer *peer) struct connecting *connect = find_connecting(peer->daemon, &peer->id); /* These should be closed already! */ - assert(!peer->to_subd); + assert(!peer->subds); assert(!peer->to_peer); assert(peer->told_to_close); diff --git a/connectd/connectd.h b/connectd/connectd.h index 9cecc6194..e75337f34 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -52,8 +53,8 @@ struct peer { /* Connection to the peer */ struct io_conn *to_peer; - /* Connection to the subdaemon */ - struct io_conn *to_subd; + /* Connections to the subdaemons */ + struct subd **subds; /* Final message to send to peer (and hangup) */ u8 *final_msg; @@ -64,11 +65,11 @@ struct peer { /* When socket has Nagle overridden */ bool urgent; - /* Input buffers. */ - u8 *subd_in, *peer_in; + /* Input buffer. */ + u8 *peer_in; - /* Output buffers. */ - struct msg_queue *subd_outq, *peer_outq; + /* Output buffer. */ + struct msg_queue *peer_outq; /* Peer sent buffer (for freeing after sending) */ const u8 *sent_to_peer; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index c6a3e47ea..91f221b56 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -36,6 +36,26 @@ #include #include +struct subd { + /* Owner: we are in peer->subds[] */ + struct peer *peer; + + /* The temporary or permanant channel_id */ + struct channel_id channel_id; + + /* In passing, we can have a temporary one, too. */ + struct channel_id *temporary_channel_id; + + /* The actual connection to talk to it */ + struct io_conn *conn; + + /* Input buffer */ + u8 *in; + + /* Output buffer */ + struct msg_queue *outq; +}; + void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { status_peer_io(LOG_IO_OUT, &peer->id, msg); @@ -51,10 +71,10 @@ static void send_warning(struct peer *peer, const char *fmt, ...) status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap); va_end(ap); - /* Close locally, send msg as final warning */ - io_close(peer->to_subd); - peer->to_subd = NULL; + /* Close to any subdaemons. */ + peer->subds = tal_free(peer->subds); + /* Send warning as final message. */ va_start(ap, fmt); peer->final_msg = towire_warningfmtv(peer, NULL, fmt, ap); va_end(ap); @@ -573,7 +593,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, msg = msg_dequeue(peer->peer_outq); /* Is it time to send final? */ - if (!msg && peer->final_msg && !peer->to_subd) { + if (!msg && peer->final_msg && !peer->subds) { /* OK, send this then close. */ msg = peer->final_msg; peer->final_msg = NULL; @@ -584,7 +604,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Still nothing to send? */ if (!msg) { /* We close once subds are all closed. */ - if (!peer->to_subd) { + if (!peer->subds) { set_closing_timer(peer, peer_conn); return io_sock_shutdown(peer_conn); } @@ -593,7 +613,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, msg = maybe_from_gossip_store(NULL, peer); if (!msg) { /* Tell them to read again, */ - io_wake(&peer->subd_in); + io_wake(&peer->subds); /* Wait for them to wake us */ return msg_queue_wait(peer_conn, peer->peer_outq, @@ -617,50 +637,59 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, } static struct io_plan *read_from_subd(struct io_conn *subd_conn, - struct peer *peer); + struct subd *subd); static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, - struct peer *peer) + struct subd *subd) { /* Tell them to encrypt & write. */ - msg_enqueue(peer->peer_outq, take(peer->subd_in)); - peer->subd_in = NULL; + msg_enqueue(subd->peer->peer_outq, take(subd->in)); + subd->in = NULL; /* Wait for them to wake us */ - return io_wait(subd_conn, &peer->subd_in, read_from_subd, peer); + return io_wait(subd_conn, &subd->peer->subds, read_from_subd, subd); } static struct io_plan *read_from_subd(struct io_conn *subd_conn, - struct peer *peer) + struct subd *subd) { - return io_read_wire(subd_conn, peer, &peer->subd_in, - read_from_subd_done, peer); + return io_read_wire(subd_conn, subd, &subd->in, + read_from_subd_done, subd); } /* These four function handle peer->subd */ static struct io_plan *write_to_subd(struct io_conn *subd_conn, - struct peer *peer) + struct subd *subd) { const u8 *msg; - assert(peer->to_subd == subd_conn); + assert(subd->conn == subd_conn); /* Pop tail of send queue */ - msg = msg_dequeue(peer->subd_outq); + msg = msg_dequeue(subd->outq); /* Nothing to send? */ if (!msg) { /* If peer is closed, close this. */ - if (!peer->to_peer) + if (!subd->peer->to_peer) return io_close(subd_conn); /* Tell them to read again. */ - io_wake(&peer->peer_in); + io_wake(&subd->peer->peer_in); /* Wait for them to wake us */ - return msg_queue_wait(subd_conn, peer->subd_outq, - write_to_subd, peer); + return msg_queue_wait(subd_conn, subd->outq, + write_to_subd, subd); } - return io_write_wire(subd_conn, take(msg), write_to_subd, peer); + return io_write_wire(subd_conn, take(msg), write_to_subd, subd); +} + +/* FIXME: We only currently have one subd */ +static struct subd *find_subd(struct peer *peer, + const struct channel_id *channel_id) +{ + if (tal_count(peer->subds) == 0) + return NULL; + return peer->subds[0]; } static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn, @@ -669,6 +698,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, struct peer *peer) { u8 *decrypted; + struct channel_id channel_id; + struct subd *subd; decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs, peer->peer_in); @@ -691,12 +722,39 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, if (handle_message_locally(peer, decrypted)) return read_hdr_from_peer(peer_conn, peer); - /* If there's no subd, discard and keep reading. */ - if (!peer->to_subd) + /* After this we should be able to match to subd by channel_id */ + if (!extract_channel_id(decrypted, &channel_id)) { + enum peer_wire type = fromwire_peektype(decrypted); + + /* We won't log this anywhere else, so do it here. */ + status_peer_io(LOG_IO_IN, &peer->id, decrypted); + + /* Could be a all-channel error or warning? Log it + * more verbose, and hang up. */ + if (type == WIRE_ERROR || type == WIRE_WARNING) { + char *desc = sanitize_error(tmpctx, decrypted, NULL); + status_peer_info(&peer->id, + "Received %s: %s", + peer_wire_name(type), desc); + return io_close(peer_conn); + } + + /* This sets final_msg: will close after sending warning */ + send_warning(peer, "Unexpected message %s: %s", + peer_wire_name(type), + tal_hex(tmpctx, decrypted)); + io_wake(peer->peer_outq); + + return read_hdr_from_peer(peer_conn, peer); + } + + /* If we don't find a subdaemon for this, discard and keep reading. */ + subd = find_subd(peer, &channel_id); + if (!subd) return read_hdr_from_peer(peer_conn, peer); /* Tell them to write. */ - msg_enqueue(peer->subd_outq, take(decrypted)); + msg_enqueue(subd->outq, take(decrypted)); /* Wait for them to wake us */ return io_wait(peer_conn, &peer->peer_in, read_hdr_from_peer, peer); @@ -734,21 +792,33 @@ static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn, read_body_from_peer, peer); } -static struct io_plan *subd_conn_init(struct io_conn *subd_conn, struct peer *peer) +static struct io_plan *subd_conn_init(struct io_conn *subd_conn, + struct subd *subd) { - peer->to_subd = subd_conn; + subd->conn = subd_conn; return io_duplex(subd_conn, - read_from_subd(subd_conn, peer), - write_to_subd(subd_conn, peer)); + read_from_subd(subd_conn, subd), + write_to_subd(subd_conn, subd)); } -static void destroy_subd_conn(struct io_conn *subd_conn, struct peer *peer) +static void destroy_subd(struct subd *subd) { - assert(subd_conn == peer->to_subd); - peer->to_subd = NULL; - /* In case they were waiting for this to send final_msg */ - if (peer->final_msg) - msg_wake(peer->peer_outq); + struct peer *peer = subd->peer; + size_t pos; + + for (pos = 0; peer->subds[pos] != subd; pos++) + assert(pos < tal_count(peer->subds)); + + tal_arr_remove(&peer->subds, pos); + + /* Last one out frees array, sets to NULL as an indicator */ + if (tal_count(peer->subds) == 0) { + peer->subds = tal_free(peer->subds); + + /* In case they were waiting for this to send final_msg */ + if (peer->final_msg) + msg_wake(peer->peer_outq); + } /* Make sure we try to keep reading from peer, so we know if * it hangs up! */ @@ -765,7 +835,7 @@ void close_peer_conn(struct peer *peer) peer->told_to_close = true; /* Already dead? */ - if (!peer->to_subd && !peer->to_peer) { + if (!peer->subds && !peer->to_peer) { peer_conn_closed(peer); return; } @@ -777,14 +847,26 @@ void close_peer_conn(struct peer *peer) bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd) { int fds[2]; + struct subd *subd; if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { status_broken("Failed to create socketpair: %s", strerror(errno)); return false; } - peer->to_subd = io_new_conn(peer, fds[0], subd_conn_init, peer); - tal_add_destructor2(peer->to_subd, destroy_subd_conn, peer); + + subd = tal(peer->subds, struct subd); + subd->peer = peer; + subd->outq = msg_queue_new(subd, false); + /* This sets subd->conn inside subd_conn_init */ + io_new_conn(peer, fds[0], subd_conn_init, subd); + /* When conn dies, subd is freed. */ + tal_steal(subd->conn, subd); + + /* Connect it to the peer */ + tal_arr_expand(&peer->subds, subd); + tal_add_destructor(subd, destroy_subd); + *fd_for_subd = fds[1]; return true; } @@ -795,8 +877,9 @@ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) peer->to_peer = NULL; /* Flush internal connections if not already. */ - if (peer->to_subd) { - msg_wake(peer->subd_outq); + if (peer->subds) { + for (size_t i = 0; i < tal_count(peer->subds); i++) + msg_wake(peer->subds[i]->outq); return; } @@ -824,7 +907,7 @@ void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES) { peer->told_to_close = true; peer->final_msg = tal_dup_talarr(peer, u8, final_msg); - if (!peer->to_subd) + if (!peer->subds) io_wake(peer->peer_outq); } diff --git a/tests/test_plugin.py b/tests/test_plugin.py index c6b0b15bb..3c020bd81 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -440,7 +440,7 @@ def test_plugin_connected_hook_chaining(node_factory): ]) # FIXME: this error occurs *after* connection, so we connect then drop. - l3.daemon.wait_for_log(r"chan#1: peer_in WIRE_WARNING") + l3.daemon.wait_for_log(r"-connectd: peer_in WIRE_WARNING") l3.daemon.wait_for_log(r"You are in reject list") def check_disconnect():