connectd: flush queues before hanging up.

This is critical in the common case where peer sends an error and
hangs up: we almost never get to relay the error to the subd in time.

This also applies in the other direction: we need to flush the queue
to the peer when the subd closes.  Note we only free the actual peer
struct when lightningd reaps us with connectd_peer_disconnected().

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-01-11 11:47:01 +10:30
parent 0841e4190b
commit 1ae3172409
2 changed files with 42 additions and 11 deletions

View File

@@ -364,12 +364,15 @@ static struct peer *new_peer(struct daemon *daemon,
peer->dev_read_enabled = true;
#endif
peer->to_peer = conn;
/* Aim for connection to shuffle data back and forth: sets up
* peer->to_subd */
if (!multiplex_subd_setup(peer, fd_for_subd))
return tal_free(peer);
peer->to_peer = tal_steal(peer, conn);
/* Now we own it */
tal_steal(peer, peer->to_peer);
peer_htable_add(&daemon->peers, peer);
tal_add_destructor2(peer, destroy_peer, daemon);
@@ -1825,6 +1828,11 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg)
void peer_conn_closed(struct peer *peer)
{
/* These should be closed already! */
assert(!peer->to_subd);
assert(!peer->to_peer);
assert(peer->told_to_close);
/* Wake up in case there's a reconnecting peer waiting in io_wait. */
io_wake(peer);
@@ -1848,12 +1856,8 @@ static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id)
type_to_string(tmpctx, struct node_id, id));
status_peer_debug(id, "disconnect");
/* Make sure we flush any outstanding writes! */
if (peer->to_peer) {
close_peer_conn(peer);
/* It calls peer_conn_closed() when done */
} else
peer_conn_closed(peer);
/* When it's finished, it will call peer_conn_closed() */
close_peer_conn(peer);
}
/* lightningd tells us a peer has disconnected. */
@@ -1893,7 +1897,7 @@ static void peer_final_msg(struct io_conn *conn,
/* This can happen if peer hung up on us. */
peer = peer_htable_get(&daemon->peers, &id);
if (peer) {
/* Log and encrypt message for peer. */
/* Log message for peer. */
status_peer_io(LOG_IO_OUT, &id, finalmsg);
multiplex_final_msg(peer, take(finalmsg));
}

View File

@@ -46,6 +46,7 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
/* Close locally, send msg as final warning */
io_close(peer->to_subd);
peer->to_subd = NULL;
va_start(ap, fmt);
peer->final_msg = towire_warningfmtv(peer, NULL, fmt, ap);
@@ -475,6 +476,10 @@ static struct io_plan *write_to_subd(struct io_conn *subd_conn,
/* Nothing to send? */
if (!msg) {
/* If peer is closed, close this. */
if (!peer->to_peer)
return io_close(subd_conn);
/* Tell them to read again. */
io_wake(&peer->peer_in);
@@ -520,6 +525,12 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
return read_hdr_from_peer(peer_conn, peer);
}
/* If there's no subd, discard and keep reading. */
if (!peer->to_subd) {
tal_free(decrypted);
return read_hdr_from_peer(peer_conn, peer);
}
/* Tell them to write. */
msg_enqueue(peer->subd_outq, take(decrypted));
@@ -574,6 +585,14 @@ static void destroy_subd_conn(struct io_conn *subd_conn, struct peer *peer)
/* In case they were waiting for this to send final_msg */
if (peer->final_msg)
msg_wake(peer->peer_outq);
/* Make sure we try to keep reading from peer, so we know if
* it hangs up! */
io_wake(&peer->peer_in);
/* If no peer, finally time to close */
if (!peer->to_peer && peer->told_to_close)
peer_conn_closed(peer);
}
void close_peer_conn(struct peer *peer)
@@ -581,6 +600,12 @@ void close_peer_conn(struct peer *peer)
/* Make write_to_peer do flush after writing */
peer->told_to_close = true;
/* Already dead? */
if (!peer->to_subd && !peer->to_peer) {
peer_conn_closed(peer);
return;
}
/* In case it's not currently writing, wake write_to_peer */
msg_wake(peer->peer_outq);
}
@@ -605,9 +630,11 @@ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
assert(peer->to_peer == peer_conn);
peer->to_peer = NULL;
/* Close internal connections if not already. */
if (peer->to_subd)
io_close(peer->to_subd);
/* Flush internal connections if not already. */
if (peer->to_subd) {
msg_wake(peer->subd_outq);
return;
}
if (peer->told_to_close)
peer_conn_closed(peer);