connectd: hold peer until we're interested.

Either because lightningd tells us it wants to talk, or because the peer
says something about a channel.

We also introduce a behavior change: we disconnect after a failed open.
We might want to modify this later, but we it's a side-effect of openingd
not holding onto idle connections.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-03-23 06:57:29 +10:30
parent 77b1087cdf
commit 2424b7dea8
24 changed files with 429 additions and 148 deletions

View File

@@ -312,6 +312,7 @@ static struct peer *new_peer(struct daemon *daemon,
peer->sent_to_peer = NULL;
peer->urgent = false;
peer->ready_to_die = false;
peer->active = false;
peer->peer_outq = msg_queue_new(peer, false);
#if DEVELOPER
@@ -321,11 +322,6 @@ static struct peer *new_peer(struct daemon *daemon,
peer->to_peer = conn;
/* Aim for connection to shuffle data back and forth: sets up
* peer->subds[0] */
if (!multiplex_subd_setup(peer, fd_for_subd))
return tal_free(peer);
/* Now we own it */
tal_steal(peer, peer->to_peer);
peer_htable_add(&daemon->peers, peer);
@@ -424,9 +420,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
/*~ daemon_conn is a message queue for inter-daemon communication: we
* queue up the `connect_peer_connected` message to tell lightningd
* we have connected, and give the peer fd. */
* we have connected. Once it says something interesting, we tell
* it that, too. */
daemon_conn_send(daemon->master, take(msg));
daemon_conn_send_fd(daemon->master, subd_fd);
/*~ Now we set up this connection to read/write from subd */
return multiplex_peer_setup(conn, peer);
@@ -1791,7 +1787,7 @@ static void try_connect_peer(struct daemon *daemon,
existing = peer_htable_get(&daemon->peers, id);
if (existing) {
/* If it's exiting now, we've raced: reconnect after */
if (tal_count(existing->subds) != 0
if ((tal_count(existing->subds) != 0 || !existing->active)
&& existing->to_peer
&& !existing->ready_to_die) {
/* Tell it it's already connected so it doesn't
@@ -1897,9 +1893,11 @@ void peer_conn_closed(struct peer *peer)
struct connecting *connect = find_connecting(peer->daemon, &peer->id);
/* These should be closed already! */
assert(!peer->subds);
assert(tal_count(peer->subds) == 0);
assert(!peer->to_peer);
assert(peer->ready_to_die);
assert(peer->ready_to_die || !peer->active);
status_peer_debug(&peer->id, "peer_conn_closed");
/* Tell gossipd to stop asking this peer gossip queries */
daemon_conn_send(peer->daemon->gossipd,
@@ -1923,32 +1921,24 @@ void peer_conn_closed(struct peer *peer)
try_connect_one_addr(connect);
}
/* A peer is gone: clean things up. */
static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id)
{
struct peer *peer;
/* We should stay in sync with lightningd at all times. */
peer = peer_htable_get(&daemon->peers, id);
if (!peer)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"peer_disconnected unknown peer: %s",
type_to_string(tmpctx, struct node_id, id));
status_peer_debug(id, "disconnect");
/* When it's finished, it will call peer_conn_closed() */
close_peer_conn(peer);
}
/* lightningd tells us a peer should be disconnected. */
static void peer_discard(struct daemon *daemon, const u8 *msg)
{
struct node_id id;
struct peer *peer;
if (!fromwire_connectd_discard_peer(msg, &id))
master_badmsg(WIRE_CONNECTD_DISCARD_PEER, msg);
cleanup_dead_peer(daemon, &id);
/* We should stay in sync with lightningd, but this can happen
* under stress. */
peer = peer_htable_get(&daemon->peers, &id);
if (!peer)
return;
status_peer_debug(&id, "disconnect");
/* When it's finished, it will call peer_conn_closed() */
close_peer_conn(peer);
}
/* lightningd tells us to send a msg and disconnect. */
@@ -2031,6 +2021,10 @@ static struct io_plan *recv_req(struct io_conn *conn,
send_custommsg(daemon, msg);
goto out;
case WIRE_CONNECTD_PEER_MAKE_ACTIVE:
peer_make_active(daemon, msg);
goto out;
case WIRE_CONNECTD_DEV_MEMLEAK:
#if DEVELOPER
dev_connect_memleak(daemon, msg);
@@ -2041,6 +2035,7 @@ static struct io_plan *recv_req(struct io_conn *conn,
case WIRE_CONNECTD_ACTIVATE_REPLY:
case WIRE_CONNECTD_PEER_CONNECTED:
case WIRE_CONNECTD_PEER_ALREADY_CONNECTED:
case WIRE_CONNECTD_PEER_ACTIVE:
case WIRE_CONNECTD_RECONNECTED:
case WIRE_CONNECTD_CONNECT_FAILED:
case WIRE_CONNECTD_DEV_MEMLEAK_REPLY:

View File

@@ -63,6 +63,9 @@ struct peer {
* it's done). */
bool ready_to_die;
/* Has this ever been active? (i.e. ever had a subd attached?) */
bool active;
/* When socket has Nagle overridden */
bool urgent;

View File

@@ -1,4 +1,5 @@
#include <bitcoin/block.h>
#include <common/channel_id.h>
#include <common/cryptomsg.h>
#include <common/features.h>
#include <common/node_id.h>
@@ -62,7 +63,7 @@ msgdata,connectd_connect_failed,failreason,wirestring,
msgdata,connectd_connect_failed,seconds_to_delay,u32,
msgdata,connectd_connect_failed,addrhint,?wireaddr_internal,
# Connectd -> master: we got a peer. Plus fd for peer daemon
# Connectd -> master: we got a peer.
msgtype,connectd_peer_connected,2002
msgdata,connectd_peer_connected,id,node_id,
msgdata,connectd_peer_connected,addr,wireaddr_internal,
@@ -75,6 +76,18 @@ msgdata,connectd_peer_connected,features,u8,flen
msgtype,connectd_peer_disconnect_done,2006
msgdata,connectd_peer_disconnect_done,id,node_id,
# Master -> connectd: make peer active immediately (we want to talk)
msgtype,connectd_peer_make_active,2004
msgdata,connectd_peer_make_active,id,node_id,
msgdata,connectd_peer_make_active,channel_id,?channel_id,
# Connectd -> master: peer said something interesting (or you said make_active)
# Plus fd for peer daemon.
msgtype,connectd_peer_active,2005
msgdata,connectd_peer_active,id,node_id,
msgdata,connectd_peer_active,msgtype,?u16,
msgdata,connectd_peer_active,channel_id,?channel_id,
# master -> connectd: peer no longer wanted, you can disconnect.
msgtype,connectd_discard_peer,2015
msgdata,connectd_discard_peer,id,node_id,
1 #include <bitcoin/block.h>
2 #include <common/channel_id.h>
3 #include <common/cryptomsg.h>
4 #include <common/features.h>
5 #include <common/node_id.h>
63 msgdata,connectd_peer_connected,incoming,bool,
64 msgdata,connectd_peer_connected,flen,u16,
65 msgdata,connectd_peer_connected,features,u8,flen
66 # connectd -> master: peer disconnected.
67 msgtype,connectd_peer_disconnect_done,2006
68 msgdata,connectd_peer_disconnect_done,id,node_id,
69 # master -> connectd: peer no longer wanted, you can disconnect. # Master -> connectd: make peer active immediately (we want to talk)
76 msgdata,connectd_peer_final_msg,msg,u8,len msgdata,connectd_peer_active,id,node_id,
77 # connectd->master: You said to connect, but we already were. msgdata,connectd_peer_active,msgtype,?u16,
78 msgtype,connectd_peer_already_connected,2007 msgdata,connectd_peer_active,channel_id,?channel_id,
79 # master -> connectd: peer no longer wanted, you can disconnect.
80 msgtype,connectd_discard_peer,2015
81 msgdata,connectd_discard_peer,id,node_id,
82 # master -> connectd: give message to peer and disconnect.
83 msgtype,connectd_peer_final_msg,2003
84 msgdata,connectd_peer_final_msg,id,node_id,
85 msgdata,connectd_peer_final_msg,len,u16,
86 msgdata,connectd_peer_final_msg,msg,u8,len
87 # connectd->master: You said to connect, but we already were.
88 msgtype,connectd_peer_already_connected,2007
89 msgdata,connectd_peer_already_connected,id,node_id,
90 # master -> connectd: do you have a memleak?
91 msgdata,connectd_peer_already_connected,id,node_id, msgtype,connectd_dev_memleak,2033
92 # master -> connectd: do you have a memleak? msgtype,connectd_dev_memleak_reply,2133
93 msgtype,connectd_dev_memleak,2033 msgdata,connectd_dev_memleak_reply,leak,bool,

View File

@@ -401,6 +401,67 @@ void send_custommsg(struct daemon *daemon, const u8 *msg)
inject_peer_msg(peer, take(custommsg));
}
/* FIXME: fwd decl */
static struct subd *multiplex_subd_setup(struct peer *peer, int *fd_for_subd);
static struct subd *activate_peer(struct peer *peer,
const enum peer_wire *type,
const struct channel_id *channel_id)
{
int fd_for_subd;
u16 t, *tp;
struct subd *subd;
/* If it wasn't active before, it is now! */
peer->active = true;
subd = multiplex_subd_setup(peer, &fd_for_subd);
if (!subd)
return NULL;
/* wire routines want a u16, not an enum */
if (type) {
t = *type;
tp = &t;
} else {
tp = NULL;
}
/* We tell lightningd to fire up a subdaemon to handle this! */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_active(NULL, &peer->id,
tp,
channel_id)));
daemon_conn_send_fd(peer->daemon->master, fd_for_subd);
return subd;
}
void peer_make_active(struct daemon *daemon, const u8 *msg)
{
struct node_id id;
struct peer *peer;
struct channel_id *channel_id;
if (!fromwire_connectd_peer_make_active(msg, msg, &id, &channel_id))
master_badmsg(WIRE_CONNECTD_PEER_MAKE_ACTIVE, msg);
/* Races can happen: this might be gone by now. */
peer = peer_htable_get(&daemon->peers, &id);
if (!peer)
return;
/* Could be disconnecting now */
if (!peer->to_peer)
return;
/* Could be made active already by receiving a message (esp reestablish!) */
if (tal_count(peer->subds) != 0)
return;
if (!activate_peer(peer, NULL, channel_id))
tal_free(peer);
}
static void handle_ping_in(struct peer *peer, const u8 *msg)
{
u8 *pong;
@@ -593,7 +654,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
msg = msg_dequeue(peer->peer_outq);
/* Is it time to send final? */
if (!msg && peer->final_msg && !peer->subds) {
if (!msg && peer->final_msg && tal_count(peer->subds) == 0) {
/* OK, send this then close. */
msg = peer->final_msg;
peer->final_msg = NULL;
@@ -603,8 +664,10 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
/* Still nothing to send? */
if (!msg) {
/* We close once subds are all closed. */
if (!peer->subds) {
/* We close once subds are all closed; or if we're not
active, when told to die. */
if ((peer->active || peer->ready_to_die)
&& tal_count(peer->subds) == 0) {
set_closing_timer(peer, peer_conn);
return io_sock_shutdown(peer_conn);
}
@@ -748,10 +811,19 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
return read_hdr_from_peer(peer_conn, peer);
}
/* If we don't find a subdaemon for this, discard and keep reading. */
/* If we don't find a subdaemon for this, activate a new one. */
subd = find_subd(peer, &channel_id);
if (!subd)
return read_hdr_from_peer(peer_conn, peer);
if (!subd) {
struct channel_id channel_id;
enum peer_wire t = fromwire_peektype(decrypted);
bool has_channel_id = extract_channel_id(decrypted, &channel_id);
status_peer_debug(&peer->id, "Activating for message %s",
peer_wire_name(t));
subd = activate_peer(peer, &t,
has_channel_id ? &channel_id : NULL);
if (!subd)
return io_close(peer_conn);
}
/* Tell them to write. */
msg_enqueue(subd->outq, take(decrypted));
@@ -806,19 +878,18 @@ static void destroy_subd(struct subd *subd)
struct peer *peer = subd->peer;
size_t pos;
status_peer_debug(&peer->id,
"destroy_subd: %zu subds, to_peer conn %p, read_to_die = %u",
tal_count(peer->subds), peer->to_peer,
peer->ready_to_die);
for (pos = 0; peer->subds[pos] != subd; pos++)
assert(pos < tal_count(peer->subds));
tal_arr_remove(&peer->subds, pos);
/* Last one out frees array, sets to NULL as an indicator */
if (tal_count(peer->subds) == 0) {
peer->subds = tal_free(peer->subds);
/* In case they were waiting for this to send final_msg */
if (peer->final_msg)
msg_wake(peer->peer_outq);
}
/* In case they were waiting for this to send final_msg */
if (tal_count(peer->subds) == 0 && peer->final_msg)
msg_wake(peer->peer_outq);
/* Make sure we try to keep reading from peer, so we know if
* it hangs up! */
@@ -835,7 +906,7 @@ void close_peer_conn(struct peer *peer)
peer->ready_to_die = true;
/* Already dead? */
if (!peer->subds && !peer->to_peer) {
if (tal_count(peer->subds) == 0 && !peer->to_peer) {
peer_conn_closed(peer);
return;
}
@@ -844,7 +915,7 @@ void close_peer_conn(struct peer *peer)
msg_wake(peer->peer_outq);
}
bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd)
static struct subd *multiplex_subd_setup(struct peer *peer, int *fd_for_subd)
{
int fds[2];
struct subd *subd;
@@ -852,7 +923,7 @@ bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd)
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
status_broken("Failed to create socketpair: %s",
strerror(errno));
return false;
return NULL;
}
subd = tal(peer->subds, struct subd);
@@ -868,7 +939,7 @@ bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd)
tal_add_destructor(subd, destroy_subd);
*fd_for_subd = fds[1];
return true;
return subd;
}
static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
@@ -876,14 +947,15 @@ static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
assert(peer->to_peer == peer_conn);
peer->to_peer = NULL;
/* Flush internal connections if not already. */
if (peer->subds) {
/* Flush internal connections if any. */
if (tal_count(peer->subds) != 0) {
for (size_t i = 0; i < tal_count(peer->subds); i++)
msg_wake(peer->subds[i]->outq);
return;
}
if (peer->ready_to_die)
/* If lightningd says we're ready, or we were never had a subd, finish */
if (peer->ready_to_die || !peer->active)
peer_conn_closed(peer);
}
@@ -898,6 +970,9 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
peer->expecting_pong = PONG_UNEXPECTED;
set_ping_timer(peer);
/* This used to be in openingd; don't break tests. */
status_peer_debug(&peer->id, "Handed peer, entering loop");
return io_duplex(peer_conn,
read_hdr_from_peer(peer_conn, peer),
write_to_peer(peer_conn, peer));
@@ -907,7 +982,7 @@ void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES)
{
peer->ready_to_die = true;
peer->final_msg = tal_dup_talarr(peer, u8, final_msg);
if (!peer->subds)
if (tal_count(peer->subds) == 0)
io_wake(peer->peer_outq);
}

View File

@@ -10,9 +10,6 @@ struct peer;
struct io_conn;
struct feature_set;
/* Set up peer->to_subd; sets fd_for_subd to pass to lightningd. */
bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd);
/* Take over peer_conn as peer->to_peer */
struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
struct peer *peer);
@@ -37,4 +34,7 @@ void send_manual_ping(struct daemon *daemon, const u8 *msg);
/* When lightningd says to send a custom message (from a plugin) */
void send_custommsg(struct daemon *daemon, const u8 *msg);
/* Lightningd wants to talk to you. */
void peer_make_active(struct daemon *daemon, const u8 *msg);
#endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */