connectd: put peer into "draining" mode when we want to close it.

This removes it from the hashtable, and forces it to do nothing but
send out any remaining packets, then close.

It is, in effect, reduced to a stub, with no further interactions
with the rest of the system (all subds are freed already).

Also removes the need for an explicit "final_msg" too.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-07-16 14:19:29 +09:30
committed by neil saitug
parent 37ff013c2c
commit 9dc3880360
3 changed files with 51 additions and 34 deletions

View File

@@ -284,9 +284,9 @@ static struct io_plan *peer_reconnected(struct io_conn *conn,
}
/*~ When we free a peer, we remove it from the daemon's hashtable */
static void destroy_peer(struct peer *peer, struct daemon *daemon)
void destroy_peer(struct peer *peer)
{
peer_htable_del(&daemon->peers, peer);
peer_htable_del(&peer->daemon->peers, peer);
}
/*~ This is where we create a new peer. */
@@ -302,13 +302,13 @@ static struct peer *new_peer(struct daemon *daemon,
peer->daemon = daemon;
peer->id = *id;
peer->cs = *cs;
peer->final_msg = NULL;
peer->subds = tal_arr(peer, struct subd *, 0);
peer->peer_in = NULL;
peer->sent_to_peer = NULL;
peer->urgent = false;
peer->ready_to_die = false;
peer->active = false;
peer->draining = false;
peer->peer_outq = msg_queue_new(peer, false);
peer->last_recv_time = time_now();
@@ -322,7 +322,7 @@ static struct peer *new_peer(struct daemon *daemon,
/* Now we own it */
tal_steal(peer, peer->to_peer);
peer_htable_add(&daemon->peers, peer);
tal_add_destructor2(peer, destroy_peer, daemon);
tal_add_destructor(peer, destroy_peer);
return peer;
}

View File

@@ -53,12 +53,12 @@ struct peer {
/* Connection to the peer */
struct io_conn *to_peer;
/* Is this draining? If so, just keep writing until queue empty */
bool draining;
/* Connections to the subdaemons */
struct subd **subds;
/* Final message to send to peer (and hangup) */
u8 *final_msg;
/* Set once lightningd says it's OK to close (subd tells it
* it's done). */
bool ready_to_die;
@@ -223,4 +223,7 @@ struct io_plan *peer_connected(struct io_conn *conn,
/* Called when peer->peer_conn is finally freed */
void peer_conn_closed(struct peer *peer);
/* Removes peer from hash table */
void destroy_peer(struct peer *peer);
#endif /* LIGHTNING_CONNECTD_CONNECTD_H */

View File

@@ -81,28 +81,54 @@ static struct subd *find_subd(struct peer *peer,
return NULL;
}
/* We just want to send these messages out to the peer's connection,
* then close. We consider the peer dead to us (can be freed). */
static void drain_peer(struct peer *peer)
{
assert(!peer->draining);
/* FIXME: Don't drain forever! */
notleak(peer);
/* We no longer want subds feeding us more messages! */
peer->subds = tal_free(peer->subds);
peer->draining = true;
/* Clean peer from hashtable; we no longer exist. */
destroy_peer(peer);
tal_del_destructor(peer, destroy_peer);
/* Start draining process! */
io_wake(peer->peer_outq);
}
void inject_peer_msg(struct peer *peer, const u8 *msg TAKES)
{
status_peer_io(LOG_IO_OUT, &peer->id, msg);
msg_enqueue(peer->peer_outq, msg);
}
void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES)
{
inject_peer_msg(peer, final_msg);
drain_peer(peer);
}
/* Send warning, close connection to peer */
static void send_warning(struct peer *peer, const char *fmt, ...)
{
va_list ap;
u8 *msg;
va_start(ap, fmt);
status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap);
va_end(ap);
/* Close to any subdaemons. */
peer->subds = tal_free(peer->subds);
/* Send warning as final message. */
va_start(ap, fmt);
peer->final_msg = towire_warningfmtv(peer, NULL, fmt, ap);
msg = towire_warningfmtv(NULL, NULL, fmt, ap);
va_end(ap);
multiplex_final_msg(peer, take(msg));
}
/* Kicks off write_to_peer() to look for more gossip to send from store */
@@ -934,17 +960,14 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
/* Pop tail of send queue */
msg = msg_dequeue(peer->peer_outq);
/* Is it time to send final? */
if (!msg && peer->final_msg && tal_count(peer->subds) == 0) {
/* OK, send this then close. */
msg = peer->final_msg;
peer->final_msg = NULL;
/* Wasn't logged earlier, so do it now */
status_peer_io(LOG_IO_OUT, &peer->id, msg);
}
/* Still nothing to send? */
if (!msg) {
/* Draining? We're done. */
if (peer->draining) {
set_closing_timer(peer, peer_conn);
return io_sock_shutdown(peer_conn);
}
/* We close once subds are all closed; or if we're not
active, when told to die. */
if ((peer->active || peer->ready_to_die)
@@ -1083,8 +1106,6 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
send_warning(peer, "Unexpected message %s: %s",
peer_wire_name(type),
tal_hex(tmpctx, decrypted));
io_wake(peer->peer_outq);
return read_hdr_from_peer(peer_conn, peer);
}
@@ -1127,6 +1148,10 @@ static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn,
{
assert(peer->to_peer == peer_conn);
/* If we're draining, ignore all incoming. */
if (peer->draining)
return io_halfclose(peer_conn);
/* BOLT #8:
*
* ### Receiving and Decrypting Messages
@@ -1164,10 +1189,6 @@ static void destroy_subd(struct subd *subd)
tal_arr_remove(&peer->subds, pos);
/* In case they were waiting for this to send final_msg */
if (tal_count(peer->subds) == 0 && peer->final_msg)
msg_wake(peer->peer_outq);
/* Make sure we try to keep reading from peer, so we know if
* it hangs up! */
io_wake(&peer->peer_in);
@@ -1260,13 +1281,6 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
write_to_peer(peer_conn, peer));
}
void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES)
{
peer->ready_to_die = true;
peer->final_msg = tal_dup_talarr(peer, u8, final_msg);
if (tal_count(peer->subds) == 0)
io_wake(peer->peer_outq);
}
/* Lightningd says to send a ping */
void send_manual_ping(struct daemon *daemon, const u8 *msg)