diff --git a/connectd/connectd.c b/connectd/connectd.c index 287c4b5fc..10944467b 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -355,6 +355,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->peer_in = NULL; peer->sent_to_peer = NULL; peer->urgent = false; + peer->told_to_close = false; peer->peer_outq = msg_queue_new(peer, false); peer->subd_outq = msg_queue_new(peer, false); @@ -1822,6 +1823,18 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg) try_connect_peer(daemon, &id, seconds_waited, addrs, addrhint); } +void peer_conn_closed(struct peer *peer) +{ + /* Wake up in case there's a reconnecting peer waiting in io_wait. */ + io_wake(peer); + + /* Note: deleting from a htable (a-la node_set_del) does not free it: + * htable doesn't assume it's a tal object at all. That's why we have + * a destructor attached to peer (called destroy_peer by + * convention). */ + tal_free(peer); +} + /* A peer is gone: clean things up. */ static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id) { @@ -1835,14 +1848,12 @@ static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id) type_to_string(tmpctx, struct node_id, id)); status_peer_debug(id, "disconnect"); - /* Wake up in case there's a reconnecting peer waiting in io_wait. */ - io_wake(peer); - - /* Note: deleting from a htable (a-la node_set_del) does not free it: - * htable doesn't assume it's a tal object at all. That's why we have - * a destructor attached to peer (called destroy_peer by - * convention). */ - tal_free(peer); + /* Make sure we flush any outstanding writes! */ + if (peer->to_peer) { + close_peer_conn(peer); + /* It calls peer_conn_closed() when done */ + } else + peer_conn_closed(peer); } /* lightningd tells us a peer has disconnected. */ diff --git a/connectd/connectd.h b/connectd/connectd.h index d99e00b4c..aa151ea89 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -47,6 +47,9 @@ struct peer { /* Final message to send to peer (and hangup) */ u8 *final_msg; + /* Set when we want to close. */ + bool told_to_close; + /* When socket has Nagle overridden */ bool urgent; @@ -181,4 +184,7 @@ struct io_plan *peer_connected(struct io_conn *conn, const u8 *their_features TAKES, bool incoming); +/* Called when peer->peer_conn is finally freed */ +void peer_conn_closed(struct peer *peer); + #endif /* LIGHTNING_CONNECTD_CONNECTD_H */ diff --git a/connectd/multiplex.c b/connectd/multiplex.c index cad5c1cbc..55f23f64e 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -386,6 +387,22 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg) return true; } +static void close_timeout(struct peer *peer) +{ + /* BROKEN means we'll trigger CI if we see it, though it's possible */ + status_peer_broken(&peer->id, "Peer did not close, forcing close"); + tal_free(peer->to_peer); +} + +/* Close this in 5 seconds if it doesn't do so by itself. */ +static void set_closing_timer(struct peer *peer, + struct io_conn *peer_conn) +{ + notleak(new_reltimer(&peer->daemon->timers, + peer_conn, time_from_sec(5), + close_timeout, peer)); +} + static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { @@ -406,6 +423,13 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, peer->final_msg, after_final_msg); } + + /* We close once subds are all closed. */ + if (!peer->to_subd) { + set_closing_timer(peer, peer_conn); + return io_sock_shutdown(peer_conn); + } + /* If they want us to send gossip, do so now. */ msg = maybe_from_gossip_store(NULL, peer); if (!msg) { @@ -498,6 +522,12 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, return read_hdr_from_peer(peer_conn, peer); } + /* Don't process packets while we're closing */ + if (peer->told_to_close) { + tal_free(decrypted); + return read_hdr_from_peer(peer_conn, peer); + } + /* If we swallow this, just try again. */ if (handle_message_locally(peer, decrypted)) { tal_free(decrypted); @@ -560,6 +590,15 @@ static void destroy_subd_conn(struct io_conn *subd_conn, struct peer *peer) msg_wake(peer->peer_outq); } +void close_peer_conn(struct peer *peer) +{ + /* Make write_to_peer do flush after writing */ + peer->told_to_close = true; + + /* In case it's not currently writing, wake write_to_peer */ + msg_wake(peer->peer_outq); +} + bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd) { int fds[2]; @@ -583,6 +622,9 @@ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) /* Close internal connections if not already. */ if (peer->to_subd) io_close(peer->to_subd); + + if (peer->told_to_close) + peer_conn_closed(peer); } struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 524e48299..0b3ba146a 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -27,4 +27,7 @@ void queue_peer_msg(struct peer *peer, const u8 *msg TAKES); void setup_peer_gossip_store(struct peer *peer, const struct feature_set *our_features, const u8 *their_features); + +/* Start the process of flushing and closing the peer_conn */ +void close_peer_conn(struct peer *peer); #endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */