From d29795a19829e308e66daf87524420bd83889c7f Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 11 Jan 2022 11:46:49 +1030 Subject: [PATCH] connectd: don't just close to peer, but use shutdown(). We would lose packets sometimes due to this previously, but it doesn't happen over localhost so our tests didn't notice. However, now we have connectd being sole thing talking to peers, we can do a more elegant shutdown, which should fix closing. Signed-off-by: Rusty Russell Changelog-Fixed: Protocol: Always flush sockets to increase chance that final message get to peer (esp. error packets). --- connectd/connectd.c | 27 +++++++++++++++++++-------- connectd/connectd.h | 6 ++++++ connectd/multiplex.c | 42 ++++++++++++++++++++++++++++++++++++++++++ connectd/multiplex.h | 3 +++ 4 files changed, 70 insertions(+), 8 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 287c4b5fc..10944467b 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -355,6 +355,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->peer_in = NULL; peer->sent_to_peer = NULL; peer->urgent = false; + peer->told_to_close = false; peer->peer_outq = msg_queue_new(peer, false); peer->subd_outq = msg_queue_new(peer, false); @@ -1822,6 +1823,18 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg) try_connect_peer(daemon, &id, seconds_waited, addrs, addrhint); } +void peer_conn_closed(struct peer *peer) +{ + /* Wake up in case there's a reconnecting peer waiting in io_wait. */ + io_wake(peer); + + /* Note: deleting from a htable (a-la node_set_del) does not free it: + * htable doesn't assume it's a tal object at all. That's why we have + * a destructor attached to peer (called destroy_peer by + * convention). */ + tal_free(peer); +} + /* A peer is gone: clean things up. */ static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id) { @@ -1835,14 +1848,12 @@ static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id) type_to_string(tmpctx, struct node_id, id)); status_peer_debug(id, "disconnect"); - /* Wake up in case there's a reconnecting peer waiting in io_wait. */ - io_wake(peer); - - /* Note: deleting from a htable (a-la node_set_del) does not free it: - * htable doesn't assume it's a tal object at all. That's why we have - * a destructor attached to peer (called destroy_peer by - * convention). */ - tal_free(peer); + /* Make sure we flush any outstanding writes! */ + if (peer->to_peer) { + close_peer_conn(peer); + /* It calls peer_conn_closed() when done */ + } else + peer_conn_closed(peer); } /* lightningd tells us a peer has disconnected. */ diff --git a/connectd/connectd.h b/connectd/connectd.h index d99e00b4c..aa151ea89 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -47,6 +47,9 @@ struct peer { /* Final message to send to peer (and hangup) */ u8 *final_msg; + /* Set when we want to close. */ + bool told_to_close; + /* When socket has Nagle overridden */ bool urgent; @@ -181,4 +184,7 @@ struct io_plan *peer_connected(struct io_conn *conn, const u8 *their_features TAKES, bool incoming); +/* Called when peer->peer_conn is finally freed */ +void peer_conn_closed(struct peer *peer); + #endif /* LIGHTNING_CONNECTD_CONNECTD_H */ diff --git a/connectd/multiplex.c b/connectd/multiplex.c index cad5c1cbc..55f23f64e 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -386,6 +387,22 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg) return true; } +static void close_timeout(struct peer *peer) +{ + /* BROKEN means we'll trigger CI if we see it, though it's possible */ + status_peer_broken(&peer->id, "Peer did not close, forcing close"); + tal_free(peer->to_peer); +} + +/* Close this in 5 seconds if it doesn't do so by itself. */ +static void set_closing_timer(struct peer *peer, + struct io_conn *peer_conn) +{ + notleak(new_reltimer(&peer->daemon->timers, + peer_conn, time_from_sec(5), + close_timeout, peer)); +} + static struct io_plan *write_to_peer(struct io_conn *peer_conn, struct peer *peer) { @@ -406,6 +423,13 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, peer->final_msg, after_final_msg); } + + /* We close once subds are all closed. */ + if (!peer->to_subd) { + set_closing_timer(peer, peer_conn); + return io_sock_shutdown(peer_conn); + } + /* If they want us to send gossip, do so now. */ msg = maybe_from_gossip_store(NULL, peer); if (!msg) { @@ -498,6 +522,12 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, return read_hdr_from_peer(peer_conn, peer); } + /* Don't process packets while we're closing */ + if (peer->told_to_close) { + tal_free(decrypted); + return read_hdr_from_peer(peer_conn, peer); + } + /* If we swallow this, just try again. */ if (handle_message_locally(peer, decrypted)) { tal_free(decrypted); @@ -560,6 +590,15 @@ static void destroy_subd_conn(struct io_conn *subd_conn, struct peer *peer) msg_wake(peer->peer_outq); } +void close_peer_conn(struct peer *peer) +{ + /* Make write_to_peer do flush after writing */ + peer->told_to_close = true; + + /* In case it's not currently writing, wake write_to_peer */ + msg_wake(peer->peer_outq); +} + bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd) { int fds[2]; @@ -583,6 +622,9 @@ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) /* Close internal connections if not already. */ if (peer->to_subd) io_close(peer->to_subd); + + if (peer->told_to_close) + peer_conn_closed(peer); } struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 524e48299..0b3ba146a 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -27,4 +27,7 @@ void queue_peer_msg(struct peer *peer, const u8 *msg TAKES); void setup_peer_gossip_store(struct peer *peer, const struct feature_set *our_features, const u8 *their_features); + +/* Start the process of flushing and closing the peer_conn */ +void close_peer_conn(struct peer *peer); #endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */