diff --git a/connectd/connectd.c b/connectd/connectd.c index 48d027e54..df21e7a45 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -284,9 +284,9 @@ static struct io_plan *peer_reconnected(struct io_conn *conn, } /*~ When we free a peer, we remove it from the daemon's hashtable */ -static void destroy_peer(struct peer *peer, struct daemon *daemon) +void destroy_peer(struct peer *peer) { - peer_htable_del(&daemon->peers, peer); + peer_htable_del(&peer->daemon->peers, peer); } /*~ This is where we create a new peer. */ @@ -302,13 +302,13 @@ static struct peer *new_peer(struct daemon *daemon, peer->daemon = daemon; peer->id = *id; peer->cs = *cs; - peer->final_msg = NULL; peer->subds = tal_arr(peer, struct subd *, 0); peer->peer_in = NULL; peer->sent_to_peer = NULL; peer->urgent = false; peer->ready_to_die = false; peer->active = false; + peer->draining = false; peer->peer_outq = msg_queue_new(peer, false); peer->last_recv_time = time_now(); @@ -322,7 +322,7 @@ static struct peer *new_peer(struct daemon *daemon, /* Now we own it */ tal_steal(peer, peer->to_peer); peer_htable_add(&daemon->peers, peer); - tal_add_destructor2(peer, destroy_peer, daemon); + tal_add_destructor(peer, destroy_peer); return peer; } diff --git a/connectd/connectd.h b/connectd/connectd.h index a1a559eb0..43e7b242a 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -53,12 +53,12 @@ struct peer { /* Connection to the peer */ struct io_conn *to_peer; + /* Is this draining? If so, just keep writing until queue empty */ + bool draining; + /* Connections to the subdaemons */ struct subd **subds; - /* Final message to send to peer (and hangup) */ - u8 *final_msg; - /* Set once lightningd says it's OK to close (subd tells it * it's done). */ bool ready_to_die; @@ -223,4 +223,7 @@ struct io_plan *peer_connected(struct io_conn *conn, /* Called when peer->peer_conn is finally freed */ void peer_conn_closed(struct peer *peer); +/* Removes peer from hash table */ +void destroy_peer(struct peer *peer); + #endif /* LIGHTNING_CONNECTD_CONNECTD_H */ diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 772b82611..9a88724b5 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -81,28 +81,54 @@ static struct subd *find_subd(struct peer *peer, return NULL; } +/* We just want to send these messages out to the peer's connection, + * then close. We consider the peer dead to us (can be freed). */ +static void drain_peer(struct peer *peer) +{ + assert(!peer->draining); + + /* FIXME: Don't drain forever! */ + notleak(peer); + + /* We no longer want subds feeding us more messages! */ + peer->subds = tal_free(peer->subds); + peer->draining = true; + + /* Clean peer from hashtable; we no longer exist. */ + destroy_peer(peer); + tal_del_destructor(peer, destroy_peer); + + /* Start draining process! */ + io_wake(peer->peer_outq); +} + void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { status_peer_io(LOG_IO_OUT, &peer->id, msg); msg_enqueue(peer->peer_outq, msg); } +void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES) +{ + inject_peer_msg(peer, final_msg); + drain_peer(peer); +} + /* Send warning, close connection to peer */ static void send_warning(struct peer *peer, const char *fmt, ...) { va_list ap; + u8 *msg; va_start(ap, fmt); status_vfmt(LOG_UNUSUAL, &peer->id, fmt, ap); va_end(ap); - /* Close to any subdaemons. */ - peer->subds = tal_free(peer->subds); - - /* Send warning as final message. */ va_start(ap, fmt); - peer->final_msg = towire_warningfmtv(peer, NULL, fmt, ap); + msg = towire_warningfmtv(NULL, NULL, fmt, ap); va_end(ap); + + multiplex_final_msg(peer, take(msg)); } /* Kicks off write_to_peer() to look for more gossip to send from store */ @@ -934,17 +960,14 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Pop tail of send queue */ msg = msg_dequeue(peer->peer_outq); - /* Is it time to send final? */ - if (!msg && peer->final_msg && tal_count(peer->subds) == 0) { - /* OK, send this then close. */ - msg = peer->final_msg; - peer->final_msg = NULL; - /* Wasn't logged earlier, so do it now */ - status_peer_io(LOG_IO_OUT, &peer->id, msg); - } - /* Still nothing to send? */ if (!msg) { + /* Draining? We're done. */ + if (peer->draining) { + set_closing_timer(peer, peer_conn); + return io_sock_shutdown(peer_conn); + } + /* We close once subds are all closed; or if we're not active, when told to die. */ if ((peer->active || peer->ready_to_die) @@ -1083,8 +1106,6 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, send_warning(peer, "Unexpected message %s: %s", peer_wire_name(type), tal_hex(tmpctx, decrypted)); - io_wake(peer->peer_outq); - return read_hdr_from_peer(peer_conn, peer); } @@ -1127,6 +1148,10 @@ static struct io_plan *read_hdr_from_peer(struct io_conn *peer_conn, { assert(peer->to_peer == peer_conn); + /* If we're draining, ignore all incoming. */ + if (peer->draining) + return io_halfclose(peer_conn); + /* BOLT #8: * * ### Receiving and Decrypting Messages @@ -1164,10 +1189,6 @@ static void destroy_subd(struct subd *subd) tal_arr_remove(&peer->subds, pos); - /* In case they were waiting for this to send final_msg */ - if (tal_count(peer->subds) == 0 && peer->final_msg) - msg_wake(peer->peer_outq); - /* Make sure we try to keep reading from peer, so we know if * it hangs up! */ io_wake(&peer->peer_in); @@ -1260,13 +1281,6 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, write_to_peer(peer_conn, peer)); } -void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES) -{ - peer->ready_to_die = true; - peer->final_msg = tal_dup_talarr(peer, u8, final_msg); - if (tal_count(peer->subds) == 0) - io_wake(peer->peer_outq); -} /* Lightningd says to send a ping */ void send_manual_ping(struct daemon *daemon, const u8 *msg)