diff --git a/connectd/connectd.c b/connectd/connectd.c index 5d57fd8f7..08a229ef0 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -211,10 +211,15 @@ static void peer_connected_in(struct daemon *daemon, tal_free(connect); } -/*~ When we free a peer, we remove it from the daemon's hashtable */ +/*~ When we free a peer, we remove it from the daemon's hashtable. + * We also call this manually if we want to elegantly drain peer's + * queues. */ void destroy_peer(struct peer *peer) { - peer_htable_del(&peer->daemon->peers, peer); + assert(!peer->draining); + + if (!peer_htable_del(&peer->daemon->peers, peer)) + abort(); /* Tell gossipd to stop asking this peer gossip queries */ daemon_conn_send(peer->daemon->gossipd, @@ -225,6 +230,10 @@ void destroy_peer(struct peer *peer) take(towire_connectd_peer_disconnect_done(NULL, &peer->id, peer->counter))); + /* This makes multiplex.c routines not feed us more, but + * *also* means that if we're freed directly, the ->to_peer + * destructor won't call drain_peer(). */ + peer->draining = true; } /*~ This is where we create a new peer. */ @@ -282,7 +291,7 @@ struct io_plan *peer_connected(struct io_conn *conn, int subd_fd; bool option_gossip_queries; - /* We remove any previous connection, on the assumption it's dead */ + /* We remove any previous connection immediately, on the assumption it's dead */ peer = peer_htable_get(&daemon->peers, id); if (peer) tal_free(peer); @@ -1858,8 +1867,8 @@ static void peer_discard(struct daemon *daemon, const u8 *msg) /* If it's reconnected already, it will learn soon. */ if (peer->counter != counter) return; - status_peer_debug(&id, "disconnect"); - tal_free(peer); + status_peer_debug(&id, "discard_peer"); + drain_peer(peer); } /* lightningd tells us to send a msg and disconnect. */ diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 2ae1b7514..6d583cc0b 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -81,39 +81,76 @@ static struct subd *find_subd(struct peer *peer, return NULL; } -/* We try to send the final messages, but if buffer is full and they're - * not reading, we have to give up. */ -static void close_timeout(struct peer *peer) +/* Except for a reconnection, we finally free a peer when the io_conn + * is closed and all subds are gone. */ +static void maybe_free_peer(struct peer *peer) { - /* BROKEN means we'll trigger CI if we see it, though it's possible */ - status_peer_broken(&peer->id, "Peer did not close, forcing close"); + if (peer->to_peer) + return; + if (tal_count(peer->subds) != 0) + return; + status_debug("maybe_free_peer freeing peer!"); tal_free(peer); } -/* We just want to send these messages out to the peer's connection, - * then close. We consider the peer dead to us (can be freed). */ -static void drain_peer(struct peer *peer) +/* We try to send the final messages, but if buffer is full and they're + * not reading, we have to give up. */ +static void close_peer_io_timeout(struct peer *peer) { + /* BROKEN means we'll trigger CI if we see it, though it's possible */ + status_peer_broken(&peer->id, "Peer did not close, forcing close"); + io_close(peer->to_peer); +} + +static void close_subd_timeout(struct subd *subd) +{ + /* BROKEN means we'll trigger CI if we see it, though it's possible */ + status_peer_broken(&subd->peer->id, "Subd did not close, forcing close"); + io_close(subd->conn); +} + +void drain_peer(struct peer *peer) +{ + status_debug("drain_peer"); assert(!peer->draining); - /* This is a 5-second leak, worst case! */ - notleak(peer); + /* Since we immediately free any subds we didn't connect yet, + * we need peer->to_peer set so it won't free peer! */ + assert(peer->to_peer); - /* We no longer want subds feeding us more messages (they - * remove themselves from array when freed) */ - while (tal_count(peer->subds)) - tal_free(peer->subds[0]); - peer->draining = true; + /* Give the subds 5 seconds to close their fds to us. */ + for (size_t i = 0; i < tal_count(peer->subds); i++) { + if (!peer->subds[i]->conn) { + /* Deletes itself from array, so be careful! */ + tal_free(peer->subds[i]); + i--; + continue; + } + status_debug("drain_peer draining subd!"); + notleak(new_reltimer(&peer->daemon->timers, + peer->subds[i], time_from_sec(5), + close_subd_timeout, peer->subds[i])); + /* Wake any outgoing queued on subd */ + io_wake(peer->subds[i]->outq); + } - /* You have 5 seconds to drain... */ - notleak(new_reltimer(&peer->daemon->timers, - peer, time_from_sec(5), - close_timeout, peer)); + /* Wake them to ensure they notice the close! */ + io_wake(&peer->subds); + + if (peer->to_peer) { + /* You have 5 seconds to drain... */ + notleak(new_reltimer(&peer->daemon->timers, + peer->to_peer, time_from_sec(5), + close_peer_io_timeout, peer)); + } /* Clean peer from hashtable; we no longer exist. */ destroy_peer(peer); tal_del_destructor(peer, destroy_peer); + /* This is a 5-second leak, worst case! */ + notleak(peer); + /* Start draining process! */ io_wake(peer->peer_outq); } @@ -899,12 +936,13 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Still nothing to send? */ if (!msg) { - /* Draining? We're done. */ - if (peer->draining) + /* Draining? We're done when subds are done. */ + if (peer->draining && tal_count(peer->subds) == 0) return io_sock_shutdown(peer_conn); /* If they want us to send gossip, do so now. */ - msg = maybe_from_gossip_store(NULL, peer); + if (!peer->draining) + msg = maybe_from_gossip_store(NULL, peer); if (!msg) { /* Tell them to read again, */ io_wake(&peer->subds); @@ -992,6 +1030,9 @@ static void destroy_subd(struct subd *subd) /* Make sure we try to keep reading from peer (might * have been waiting for write_to_subd) */ io_wake(&peer->peer_in); + + /* Maybe we were last subd out? */ + maybe_free_peer(peer); } static struct subd *new_subd(struct peer *peer, @@ -1132,6 +1173,9 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn, struct subd *subd) { subd->conn = subd_conn; + + /* subd is a child of the conn: free when it closes! */ + tal_steal(subd->conn, subd); return io_duplex(subd_conn, read_from_subd(subd_conn, subd), write_to_subd(subd_conn, subd)); @@ -1140,18 +1184,15 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn, static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); + + /* If subds need cleaning, this will do it */ + if (!peer->draining) + drain_peer(peer); + peer->to_peer = NULL; - /* Flush internal connections if any: last one out will free peer. */ - if (tal_count(peer->subds) != 0) { - for (size_t i = 0; i < tal_count(peer->subds); i++) - msg_wake(peer->subds[i]->outq); - return; - } - - /* We never had any subds? Free peer (might already be being freed, - * as it's our parent, but that's allowed by tal). */ - tal_free(peer); + /* Or if there were no subds, this will free the peer. */ + maybe_free_peer(peer); } struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, @@ -1203,8 +1244,9 @@ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd) subd = new_subd(peer, &channel_id); assert(!subd->conn); - /* This sets subd->conn inside subd_conn_init */ - io_new_conn(subd, fd, subd_conn_init, subd); + + /* This sets subd->conn inside subd_conn_init, and reparents subd! */ + io_new_conn(peer, fd, subd_conn_init, subd); } /* Lightningd says to send a ping */ diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 77a41fa2a..a3bda8ad3 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -22,6 +22,10 @@ void multiplex_final_msg(struct peer *peer, * this does io logging. */ void inject_peer_msg(struct peer *peer, const u8 *msg TAKES); +/* Start closing the peer: removes itself from hash table, frees itself + * once done. */ +void drain_peer(struct peer *peer); + void setup_peer_gossip_store(struct peer *peer, const struct feature_set *our_features, const u8 *their_features); diff --git a/tests/test_connection.py b/tests/test_connection.py index 24c7cec89..6b6d33ba2 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -2869,8 +2869,8 @@ def test_opener_feerate_reconnect(node_factory, bitcoind): l2.daemon.wait_for_log(r'dev_disconnect: \-WIRE_COMMITMENT_SIGNED') # Wait until they reconnect. - l1.daemon.wait_for_log('Peer transient failure in CHANNELD_NORMAL') - l1.daemon.wait_for_log('peer_disconnect_done') + l1.daemon.wait_for_logs(['Peer transient failure in CHANNELD_NORMAL', + 'peer_disconnect_done']) wait_for(lambda: l1.rpc.getpeer(l2.info['id'])['connected']) # Should work normally. diff --git a/tests/test_opening.py b/tests/test_opening.py index 3a454d50e..8d63b2a92 100644 --- a/tests/test_opening.py +++ b/tests/test_opening.py @@ -354,7 +354,7 @@ def test_v2_rbf_liquidity_ad(node_factory, bitcoind, chainparams): # l1 leases a channel from l2 l1.rpc.connect(l2.info['id'], 'localhost', l2.port) rates = l1.rpc.dev_queryrates(l2.info['id'], amount, amount) - l1.daemon.wait_for_log('disconnect') + wait_for(lambda: l1.rpc.listpeers()['peers'] == []) l1.rpc.connect(l2.info['id'], 'localhost', l2.port) chan_id = l1.rpc.fundchannel(l2.info['id'], amount, request_amt=amount, feerate='{}perkw'.format(feerate),