connectd: tell lightningd when disconnect is complete.

This avoids races in our tests where we assume it's sync (and is kind
of nicer).

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-03-23 06:56:30 +10:30
parent 6cc9f37cab
commit deecedb033
12 changed files with 83 additions and 20 deletions

View File

@@ -1905,6 +1905,10 @@ void peer_conn_closed(struct peer *peer)
daemon_conn_send(peer->daemon->gossipd, daemon_conn_send(peer->daemon->gossipd,
take(towire_gossipd_peer_gone(NULL, &peer->id))); take(towire_gossipd_peer_gone(NULL, &peer->id)));
/* Tell lightningd it's really disconnected */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_disconnect_done(NULL,
&peer->id)));
/* Wake up in case there's a reconnecting peer waiting in io_wait. */ /* Wake up in case there's a reconnecting peer waiting in io_wait. */
io_wake(peer); io_wake(peer);
@@ -2043,6 +2047,7 @@ static struct io_plan *recv_req(struct io_conn *conn,
case WIRE_CONNECTD_PING_REPLY: case WIRE_CONNECTD_PING_REPLY:
case WIRE_CONNECTD_GOT_ONIONMSG_TO_US: case WIRE_CONNECTD_GOT_ONIONMSG_TO_US:
case WIRE_CONNECTD_CUSTOMMSG_IN: case WIRE_CONNECTD_CUSTOMMSG_IN:
case WIRE_CONNECTD_PEER_DISCONNECT_DONE:
break; break;
} }

View File

@@ -71,6 +71,10 @@ msgdata,connectd_peer_connected,incoming,bool,
msgdata,connectd_peer_connected,flen,u16, msgdata,connectd_peer_connected,flen,u16,
msgdata,connectd_peer_connected,features,u8,flen msgdata,connectd_peer_connected,features,u8,flen
# connectd -> master: peer disconnected.
msgtype,connectd_peer_disconnect_done,2006
msgdata,connectd_peer_disconnect_done,id,node_id,
# master -> connectd: peer no longer wanted, you can disconnect. # master -> connectd: peer no longer wanted, you can disconnect.
msgtype,connectd_discard_peer,2015 msgtype,connectd_discard_peer,2015
msgdata,connectd_discard_peer,id,node_id, msgdata,connectd_discard_peer,id,node_id,
1 #include <bitcoin/block.h>
71 msgdata,connectd_peer_final_msg,len,u16, # master -> connectd: give message to peer and disconnect.
72 msgdata,connectd_peer_final_msg,msg,u8,len msgtype,connectd_peer_final_msg,2003
73 # connectd->master: You said to connect, but we already were. msgdata,connectd_peer_final_msg,id,node_id,
74 msgdata,connectd_peer_final_msg,len,u16,
75 msgdata,connectd_peer_final_msg,msg,u8,len
76 # connectd->master: You said to connect, but we already were.
77 msgtype,connectd_peer_already_connected,2007
78 msgtype,connectd_peer_already_connected,2007 msgdata,connectd_peer_already_connected,id,node_id,
79 msgdata,connectd_peer_already_connected,id,node_id, # master -> connectd: do you have a memleak?
80 # master -> connectd: do you have a memleak? msgtype,connectd_dev_memleak,2033

View File

@@ -33,13 +33,7 @@ void channel_set_owner(struct channel *channel, struct subd *owner)
if (old_owner) { if (old_owner) {
subd_release_channel(old_owner, channel); subd_release_channel(old_owner, channel);
if (channel->connected && !connects_to_peer(owner)) { if (channel->connected && !connects_to_peer(owner)) {
/* If shutting down, connectd no longer exists, /* If shutting down, connectd no longer exists */
* and we should not transfer peer to connectd.
* Only transfer to connectd if connectd is
* there to be transferred to.
*/
assert(channel->peer->connected);
channel->peer->connected = false;
if (channel->peer->ld->connectd) { if (channel->peer->ld->connectd) {
u8 *msg; u8 *msg;
msg = towire_connectd_discard_peer( msg = towire_connectd_discard_peer(
@@ -47,7 +41,8 @@ void channel_set_owner(struct channel *channel, struct subd *owner)
&channel->peer->id); &channel->peer->id);
subd_send_msg(channel->peer->ld->connectd, subd_send_msg(channel->peer->ld->connectd,
take(msg)); take(msg));
} } else
channel->peer->is_connected = false;
} }
} }
channel->connected = connects_to_peer(owner); channel->connected = connects_to_peer(owner);

View File

@@ -439,6 +439,10 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd
peer_connected(connectd->ld, msg, fds[0]); peer_connected(connectd->ld, msg, fds[0]);
break; break;
case WIRE_CONNECTD_PEER_DISCONNECT_DONE:
peer_disconnect_done(connectd->ld, msg);
break;
case WIRE_CONNECTD_PEER_ALREADY_CONNECTED: case WIRE_CONNECTD_PEER_ALREADY_CONNECTED:
peer_already_connected(connectd->ld, msg); peer_already_connected(connectd->ld, msg);
break; break;

View File

@@ -194,6 +194,7 @@ static struct lightningd *new_lightningd(const tal_t *ctx)
list_head_init(&ld->sendpay_commands); list_head_init(&ld->sendpay_commands);
list_head_init(&ld->close_commands); list_head_init(&ld->close_commands);
list_head_init(&ld->ping_commands); list_head_init(&ld->ping_commands);
list_head_init(&ld->disconnect_commands);
list_head_init(&ld->waitblockheight_commands); list_head_init(&ld->waitblockheight_commands);
/*~ Tal also explicitly supports arrays: it stores the number of /*~ Tal also explicitly supports arrays: it stores the number of

View File

@@ -193,6 +193,8 @@ struct lightningd {
struct list_head close_commands; struct list_head close_commands;
/* Outstanding ping commands. */ /* Outstanding ping commands. */
struct list_head ping_commands; struct list_head ping_commands;
/* Outstanding disconnect commands. */
struct list_head disconnect_commands;
/* Maintained by invoices.c */ /* Maintained by invoices.c */
struct invoices *invoices; struct invoices *invoices;

View File

@@ -193,8 +193,6 @@ void handle_reestablish(struct lightningd *ld,
"Unknown channel for reestablish"); "Unknown channel for reestablish");
log_debug(ld->log, "Reestablish on UNKNOWN channel %s", log_debug(ld->log, "Reestablish on UNKNOWN channel %s",
type_to_string(tmpctx, struct channel_id, channel_id)); type_to_string(tmpctx, struct channel_id, channel_id));
if (peer)
peer->connected = false;
/* Unless we're shutting down */ /* Unless we're shutting down */
if (ld->connectd) if (ld->connectd)
subd_send_msg(ld->connectd, subd_send_msg(ld->connectd,

View File

@@ -100,7 +100,7 @@ struct peer *new_peer(struct lightningd *ld, u64 dbid,
peer->their_features = NULL; peer->their_features = NULL;
list_head_init(&peer->channels); list_head_init(&peer->channels);
peer->direction = node_id_idx(&peer->ld->id, &peer->id); peer->direction = node_id_idx(&peer->ld->id, &peer->id);
peer->connected = false; peer->is_connected = false;
#if DEVELOPER #if DEVELOPER
peer->ignore_htlcs = false; peer->ignore_htlcs = false;
#endif #endif
@@ -1062,7 +1062,6 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa
send_error: send_error:
log_debug(ld->log, "Telling connectd to send error %s", log_debug(ld->log, "Telling connectd to send error %s",
tal_hex(tmpctx, error)); tal_hex(tmpctx, error));
peer->connected = false;
/* Get connectd to send error and close. */ /* Get connectd to send error and close. */
subd_send_msg(ld->connectd, subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id, take(towire_connectd_peer_final_msg(NULL, &peer->id,
@@ -1209,7 +1208,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg, int peer_fd)
if (!peer) if (!peer)
peer = new_peer(ld, 0, &id, &hook_payload->addr, peer = new_peer(ld, 0, &id, &hook_payload->addr,
hook_payload->incoming); hook_payload->incoming);
peer->connected = true; peer->is_connected = true;
tal_steal(peer, hook_payload); tal_steal(peer, hook_payload);
hook_payload->peer = peer; hook_payload->peer = peer;
@@ -1240,6 +1239,44 @@ void peer_connected(struct lightningd *ld, const u8 *msg, int peer_fd)
plugin_hook_call_peer_connected(ld, hook_payload); plugin_hook_call_peer_connected(ld, hook_payload);
} }
struct disconnect_command {
struct list_node list;
/* Command structure. This is the parent of the close command. */
struct command *cmd;
/* node being disconnected. */
struct node_id id;
};
static void destroy_disconnect_command(struct disconnect_command *dc)
{
list_del(&dc->list);
}
void peer_disconnect_done(struct lightningd *ld, const u8 *msg)
{
struct node_id id;
struct disconnect_command *i, *next;
struct peer *p;
if (!fromwire_connectd_peer_disconnect_done(msg, &id))
fatal("Connectd gave bad PEER_DISCONNECT_DONE message %s",
tal_hex(msg, msg));
/* If we still have peer, it's disconnected now */
p = peer_by_id(ld, &id);
if (p)
p->is_connected = false;
/* Wake any disconnect commands (removes self from list) */
list_for_each_safe(&ld->disconnect_commands, i, next, list) {
if (!node_id_eq(&i->id, &id))
continue;
was_pending(command_success(i->cmd,
json_stream_success(i->cmd)));
}
}
static bool check_funding_details(const struct bitcoin_tx *tx, static bool check_funding_details(const struct bitcoin_tx *tx,
const u8 *wscript, const u8 *wscript,
struct amount_sat funding, struct amount_sat funding,
@@ -1489,12 +1526,12 @@ static void json_add_peer(struct lightningd *ld,
json_object_start(response, NULL); json_object_start(response, NULL);
json_add_node_id(response, "id", &p->id); json_add_node_id(response, "id", &p->id);
json_add_bool(response, "connected", p->connected); json_add_bool(response, "connected", p->is_connected);
/* If it's not connected, features are unreliable: we don't /* If it's not connected, features are unreliable: we don't
* store them in the database, and they would only reflect * store them in the database, and they would only reflect
* their features *last* time they connected. */ * their features *last* time they connected. */
if (p->connected) { if (p->is_connected) {
json_array_start(response, "netaddr"); json_array_start(response, "netaddr");
json_add_string(response, NULL, json_add_string(response, NULL,
type_to_string(tmpctx, type_to_string(tmpctx,
@@ -1693,6 +1730,7 @@ static struct command_result *json_disconnect(struct command *cmd,
const jsmntok_t *params) const jsmntok_t *params)
{ {
struct node_id *id; struct node_id *id;
struct disconnect_command *dc;
struct peer *peer; struct peer *peer;
struct channel *channel; struct channel *channel;
bool *force; bool *force;
@@ -1712,7 +1750,7 @@ static struct command_result *json_disconnect(struct command *cmd,
if (*force) { if (*force) {
channel_fail_reconnect(channel, channel_fail_reconnect(channel,
"disconnect command force=true"); "disconnect command force=true");
return command_success(cmd, json_stream_success(cmd)); goto wait_for_connectd;
} }
return command_fail(cmd, LIGHTNINGD, "Peer is in state %s", return command_fail(cmd, LIGHTNINGD, "Peer is in state %s",
channel_state_name(channel)); channel_state_name(channel));
@@ -1720,14 +1758,23 @@ static struct command_result *json_disconnect(struct command *cmd,
channel = peer_unsaved_channel(peer); channel = peer_unsaved_channel(peer);
if (channel) { if (channel) {
channel_unsaved_close_conn(channel, "disconnect command"); channel_unsaved_close_conn(channel, "disconnect command");
return command_success(cmd, json_stream_success(cmd)); goto wait_for_connectd;
} }
if (!peer->uncommitted_channel) { if (!peer->uncommitted_channel) {
return command_fail(cmd, LIGHTNINGD, "Peer not connected"); return command_fail(cmd, LIGHTNINGD, "Peer not connected");
} }
kill_uncommitted_channel(peer->uncommitted_channel, kill_uncommitted_channel(peer->uncommitted_channel,
"disconnect command"); "disconnect command");
return command_success(cmd, json_stream_success(cmd));
wait_for_connectd:
/* Connectd tells us when it's finally disconnected */
dc = tal(cmd, struct disconnect_command);
dc->cmd = cmd;
dc->id = *id;
list_add_tail(&cmd->ld->disconnect_commands, &dc->list);
tal_add_destructor(dc, destroy_disconnect_command);
return command_still_pending(cmd);
} }
static const struct json_command disconnect_command = { static const struct json_command disconnect_command = {

View File

@@ -31,7 +31,7 @@ struct peer {
struct list_head channels; struct list_head channels;
/* Are we connected? */ /* Are we connected? */
bool connected; bool is_connected;
/* Our (only) uncommitted channel, still opening. */ /* Our (only) uncommitted channel, still opening. */
struct uncommitted_channel *uncommitted_channel; struct uncommitted_channel *uncommitted_channel;
@@ -68,6 +68,7 @@ struct peer *peer_from_json(struct lightningd *ld,
const jsmntok_t *peeridtok); const jsmntok_t *peeridtok);
void peer_connected(struct lightningd *ld, const u8 *msg, int peer_fd); void peer_connected(struct lightningd *ld, const u8 *msg, int peer_fd);
void peer_disconnect_done(struct lightningd *ld, const u8 *msg);
/* Could be configurable. */ /* Could be configurable. */
#define OUR_CHANNEL_FLAGS CHANNEL_FLAGS_ANNOUNCE_CHANNEL #define OUR_CHANNEL_FLAGS CHANNEL_FLAGS_ANNOUNCE_CHANNEL

View File

@@ -207,6 +207,9 @@ bool fromwire_channeld_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNE
/* Generated stub for fromwire_connectd_peer_connected */ /* Generated stub for fromwire_connectd_peer_connected */
bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); } { fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_disconnect_done */
bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); }
/* Generated stub for fromwire_dualopend_dev_memleak_reply */ /* Generated stub for fromwire_dualopend_dev_memleak_reply */
bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED) bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); } { fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); }

View File

@@ -1303,7 +1303,7 @@ def test_reserve_enforcement(node_factory, executor):
'Peer transient failure in CHANNELD_NORMAL: channeld.*' 'Peer transient failure in CHANNELD_NORMAL: channeld.*'
' CHANNEL_ERR_CHANNEL_CAPACITY_EXCEEDED' ' CHANNEL_ERR_CHANNEL_CAPACITY_EXCEEDED'
) )
assert only_one(l1.rpc.listpeers()['peers'])['connected'] is False wait_for(lambda: only_one(l1.rpc.listpeers()['peers'])['connected'] is False)
def test_ipv4_and_ipv6(node_factory): def test_ipv4_and_ipv6(node_factory):

View File

@@ -151,6 +151,9 @@ bool fromwire_channeld_sending_commitsig(const tal_t *ctx UNNEEDED, const void *
/* Generated stub for fromwire_connectd_peer_connected */ /* Generated stub for fromwire_connectd_peer_connected */
bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED) bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); } { fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_disconnect_done */
bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); }
/* Generated stub for fromwire_dualopend_dev_memleak_reply */ /* Generated stub for fromwire_dualopend_dev_memleak_reply */
bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED) bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); } { fprintf(stderr, "fromwire_dualopend_dev_memleak_reply called!\n"); abort(); }