From 67aa95c1946e9ea61744c6c89f612a5dd709b14d Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 11 Dec 2017 13:46:50 +1030 Subject: [PATCH] gossipd: hand back peer, don't hand a new peer. All peers come from gossipd, and maintain an fd to talk to it. Sometimes we hand the peer back, but to avoid a race, we always recreated it. The race was that a daemon closed the gossip_fd, which made gossipd forget the peer, then master handed the peer back to gossipd. We stop the race by never closing the gossipfd, but hand it back to gossipd for closing. Now gossipd has to accept two fds, but the handling of peers is far clearer. Signed-off-by: Rusty Russell --- gossipd/gossip.c | 111 ++++++++++++++++++++++-------------- gossipd/gossip_wire.csv | 17 ++---- lightningd/gossip_control.c | 2 +- lightningd/peer_control.c | 25 ++++---- lightningd/peer_control.h | 3 - openingd/opening.c | 1 + tests/test_lightningd.py | 30 +++++----- 7 files changed, 101 insertions(+), 88 deletions(-) diff --git a/gossipd/gossip.c b/gossipd/gossip.c index f68d5c00b..ab38d880b 100644 --- a/gossipd/gossip.c +++ b/gossipd/gossip.c @@ -827,55 +827,80 @@ static struct io_plan *new_peer_got_fd(struct io_conn *conn, struct peer *peer) return daemon_conn_read_next(conn, &peer->daemon->master); } -/* Read and close fd */ -static struct io_plan *discard_peer_fd(struct io_conn *conn, int *fd) -{ - struct daemon *daemon = tal_parent(fd); - close(*fd); - tal_free(fd); - return daemon_conn_read_next(conn, &daemon->master); -} - -static struct io_plan *handle_peer(struct io_conn *conn, struct daemon *daemon, - const u8 *msg) -{ - struct peer *peer; - struct crypto_state cs; +/* This lets us read the fds in before handling anything. */ +struct returning_peer { + struct daemon *daemon; struct pubkey id; - struct wireaddr addr; - u8 *gfeatures, *lfeatures; + struct crypto_state cs; u8 *inner_msg; + int peer_fd, gossip_fd; +}; - if (!fromwire_gossipctl_handle_peer(msg, msg, NULL, &id, &addr, &cs, - &gfeatures, &lfeatures, &inner_msg)) - master_badmsg(WIRE_GOSSIPCTL_HANDLE_PEER, msg); +static struct io_plan *handle_returning_peer(struct io_conn *conn, + struct returning_peer *rpeer) +{ + struct daemon *daemon = rpeer->daemon; + struct peer *peer; - /* If it already exists locally, that's probably a reconnect: - * drop this one. If it exists as remote, replace with this.*/ - peer = find_peer(daemon, &id); - if (peer) { - if (peer->local) { - int *fd = tal(daemon, int); - status_trace("handle_peer %s: duplicate, dropping", - type_to_string(trc, struct pubkey, &id)); - return io_recv_fd(conn, fd, discard_peer_fd, fd); - } - status_trace("handle_peer %s: found remote duplicate, dropping", - type_to_string(trc, struct pubkey, &id)); - tal_free(peer); + peer = find_peer(daemon, &rpeer->id); + if (!peer) + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "hand_back_peer unknown peer: %s", + type_to_string(trc, struct pubkey, &rpeer->id)); + + /* We don't need the gossip_fd. We could drain it, so no gossip msgs + * are missed, but that seems overkill. */ + close(rpeer->gossip_fd); + + /* Possible if there's a reconnect: ignore handed back. */ + if (peer->local) { + status_trace("hand_back_peer %s: reconnected, dropping handback", + type_to_string(trc, struct pubkey, &rpeer->id)); + + close(rpeer->peer_fd); + tal_free(rpeer); + return daemon_conn_read_next(conn, &daemon->master); } - status_trace("handle_peer %s: new peer", - type_to_string(trc, struct pubkey, &id)); - peer = new_peer(daemon, daemon, &id, &addr, &cs); - peer->gfeatures = tal_steal(peer, gfeatures); - peer->lfeatures = tal_steal(peer, lfeatures); - peer_finalized(peer); + status_trace("hand_back_peer %s: now local again", + type_to_string(trc, struct pubkey, &rpeer->id)); - if (tal_len(inner_msg)) - msg_enqueue(&peer->local->peer_out, take(inner_msg)); + /* Now we talk to peer directly again. */ + daemon_conn_clear(peer->remote); + peer->remote = tal_free(peer->remote); - return io_recv_fd(conn, &peer->local->fd, new_peer_got_fd, peer); + peer->local = new_local_peer_state(peer, &rpeer->cs); + peer->local->fd = rpeer->peer_fd; + + /* If they told us to send a message, queue it now */ + if (tal_len(rpeer->inner_msg)) + msg_enqueue(&peer->local->peer_out, take(rpeer->inner_msg)); + tal_free(rpeer); + + return new_peer_got_fd(conn, peer); +} + +static struct io_plan *read_returning_gossipfd(struct io_conn *conn, + struct returning_peer *rpeer) +{ + return io_recv_fd(conn, &rpeer->gossip_fd, + handle_returning_peer, rpeer); +} + +static struct io_plan *hand_back_peer(struct io_conn *conn, + struct daemon *daemon, + const u8 *msg) +{ + struct returning_peer *rpeer = tal(daemon, struct returning_peer); + + rpeer->daemon = daemon; + if (!fromwire_gossipctl_hand_back_peer(msg, msg, NULL, + &rpeer->id, &rpeer->cs, + &rpeer->inner_msg)) + master_badmsg(WIRE_GOSSIPCTL_HAND_BACK_PEER, msg); + + return io_recv_fd(conn, &rpeer->peer_fd, + read_returning_gossipfd, rpeer); } static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon, @@ -1483,8 +1508,8 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master handle_forwarded_msg(conn, daemon, daemon->master.msg_in); return daemon_conn_read_next(conn, &daemon->master); - case WIRE_GOSSIPCTL_HANDLE_PEER: - return handle_peer(conn, daemon, master->msg_in); + case WIRE_GOSSIPCTL_HAND_BACK_PEER: + return hand_back_peer(conn, daemon, master->msg_in); case WIRE_GOSSIPCTL_REACH_PEER: return reach_peer(conn, daemon, master->msg_in); diff --git a/gossipd/gossip_wire.csv b/gossipd/gossip_wire.csv index 02f89903f..9c04108fb 100644 --- a/gossipd/gossip_wire.csv +++ b/gossipd/gossip_wire.csv @@ -64,17 +64,12 @@ gossipctl_release_peer_reply,,lfeatures,lflen*u8 # Gossipd -> master: reply to gossip_release_peer if we couldn't find the peer. gossipctl_release_peer_replyfail,3204 -# Gossipd -> master: take over peer, with optional msg. (+peer fd) -gossipctl_handle_peer,3013 -gossipctl_handle_peer,,id,struct pubkey -gossipctl_handle_peer,,addr,struct wireaddr -gossipctl_handle_peer,,crypto_state,struct crypto_state -gossipctl_handle_peer,,gflen,u16 -gossipctl_handle_peer,,gfeatures,gflen*u8 -gossipctl_handle_peer,,lflen,u16 -gossipctl_handle_peer,,lfeatures,lflen*u8 -gossipctl_handle_peer,,len,u16 -gossipctl_handle_peer,,msg,len*u8 +# Gossipd -> master: take back peer, with optional msg. (+peer fd, +gossip fd) +gossipctl_hand_back_peer,3013 +gossipctl_hand_back_peer,,id,struct pubkey +gossipctl_hand_back_peer,,crypto_state,struct crypto_state +gossipctl_hand_back_peer,,len,u16 +gossipctl_hand_back_peer,,msg,len*u8 # Pass JSON-RPC getnodes call through gossip_getnodes_request,3005 diff --git a/lightningd/gossip_control.c b/lightningd/gossip_control.c index 2bc604201..db78963d4 100644 --- a/lightningd/gossip_control.c +++ b/lightningd/gossip_control.c @@ -66,7 +66,7 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds) case WIRE_GOSSIP_RESOLVE_CHANNEL_REQUEST: case WIRE_GOSSIP_FORWARDED_MSG: case WIRE_GOSSIPCTL_REACH_PEER: - case WIRE_GOSSIPCTL_HANDLE_PEER: + case WIRE_GOSSIPCTL_HAND_BACK_PEER: case WIRE_GOSSIPCTL_RELEASE_PEER: case WIRE_GOSSIPCTL_PEER_ADDRHINT: case WIRE_GOSSIP_GET_UPDATE: diff --git a/lightningd/peer_control.c b/lightningd/peer_control.c index a04501b6a..f91f3a5a9 100644 --- a/lightningd/peer_control.c +++ b/lightningd/peer_control.c @@ -640,11 +640,10 @@ void peer_connected(struct lightningd *ld, const u8 *msg, return_to_gossipd: /* Otherwise, we hand back to gossipd, to continue. */ - msg = towire_gossipctl_handle_peer(msg, &id, &addr, &cs, - gfeatures, lfeatures, NULL); + msg = towire_gossipctl_hand_back_peer(msg, &id, &cs, NULL); subd_send_msg(ld->gossip, take(msg)); subd_send_fd(ld->gossip, peer_fd); - close(gossip_fd); + subd_send_fd(ld->gossip, gossip_fd); /* If we were waiting for connection, we succeeded. */ connect_succeeded(ld, &id); @@ -653,11 +652,10 @@ return_to_gossipd: send_error: /* Hand back to gossipd, with an error packet. */ connect_failed(ld, &id, sanitize_error(msg, error, NULL)); - msg = towire_gossipctl_handle_peer(msg, &id, &addr, &cs, - gfeatures, lfeatures, error); + msg = towire_gossipctl_hand_back_peer(msg, &id, &cs, error); subd_send_msg(ld->gossip, take(msg)); subd_send_fd(ld->gossip, peer_fd); - close(gossip_fd); + subd_send_fd(ld->gossip, gossip_fd); } void peer_sent_nongossip(struct lightningd *ld, @@ -704,11 +702,10 @@ void peer_sent_nongossip(struct lightningd *ld, send_error: /* Hand back to gossipd, with an error packet. */ connect_failed(ld, id, sanitize_error(error, error, NULL)); - msg = towire_gossipctl_handle_peer(error, id, addr, cs, - gfeatures, lfeatures, error); + msg = towire_gossipctl_hand_back_peer(ld, id, cs, error); subd_send_msg(ld->gossip, take(msg)); subd_send_fd(ld->gossip, peer_fd); - close(gossip_fd); + subd_send_fd(ld->gossip, gossip_fd); tal_free(error); } @@ -2393,9 +2390,9 @@ static unsigned int opening_negotiation_failed(struct subd *openingd, u8 *err; const char *why; - /* We need the peer fd. */ + /* We need the peer fd and gossip fd. */ if (tal_count(fds) == 0) - return 1; + return 2; if (!fromwire_opening_negotiation_failed(msg, msg, NULL, &cs, &err)) { peer_internal_error(peer, @@ -2404,12 +2401,10 @@ static unsigned int opening_negotiation_failed(struct subd *openingd, return 0; } - /* FIXME: Should we save addr in peer, or should gossipd remember it? */ - msg = towire_gossipctl_handle_peer(msg, &peer->id, NULL, &cs, - peer->gfeatures, peer->lfeatures, - NULL); + msg = towire_gossipctl_hand_back_peer(msg, &peer->id, &cs, NULL); subd_send_msg(openingd->ld->gossip, take(msg)); subd_send_fd(openingd->ld->gossip, fds[0]); + subd_send_fd(openingd->ld->gossip, fds[1]); why = tal_strndup(peer, (const char *)err, tal_len(err)); log_unusual(peer->log, "Opening negotiation failed: %s", why); diff --git a/lightningd/peer_control.h b/lightningd/peer_control.h index cda4610aa..20f2e1453 100644 --- a/lightningd/peer_control.h +++ b/lightningd/peer_control.h @@ -26,9 +26,6 @@ struct peer { /* ID of peer */ struct pubkey id; - /* Global and local features bitfields. */ - const u8 *gfeatures, *lfeatures; - /* Error message (iff in error state) */ u8 *error; diff --git a/openingd/opening.c b/openingd/opening.c index 9cf087c48..f10e3dd7d 100644 --- a/openingd/opening.c +++ b/openingd/opening.c @@ -92,6 +92,7 @@ static void negotiation_failed(struct state *state, bool send_error, (const u8 *)errmsg); wire_sync_write(REQ_FD, msg); fdpass_send(REQ_FD, PEER_FD); + fdpass_send(REQ_FD, GOSSIP_FD); tal_free(state); exit(0); diff --git a/tests/test_lightningd.py b/tests/test_lightningd.py index c1787bf18..85eed4575 100644 --- a/tests/test_lightningd.py +++ b/tests/test_lightningd.py @@ -221,8 +221,8 @@ class LightningDTests(BaseLightningDTests): assert ret['id'] == l2.info['id'] - l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') - l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') + l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') return l1,l2 # Returns the short channel-id: :: @@ -342,8 +342,8 @@ class LightningDTests(BaseLightningDTests): assert l2.rpc.getpeer(l1.info['id'])['state'] == 'GOSSIPING' # Both gossipds will have them as new peers once handed back. - l1.daemon.wait_for_log('handle_peer {}: new peer'.format(l2.info['id'])) - l2.daemon.wait_for_log('handle_peer {}: new peer'.format(l1.info['id'])) + l1.daemon.wait_for_log('hand_back_peer {}: now local again'.format(l2.info['id'])) + l2.daemon.wait_for_log('hand_back_peer {}: now local again'.format(l1.info['id'])) def test_balance(self): l1,l2 = self.connect() @@ -709,8 +709,8 @@ class LightningDTests(BaseLightningDTests): assert ret['id'] == l2.info['id'] - l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') - l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') + l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') addr = l1.rpc.newaddr()['address'] txid = l1.bitcoin.rpc.sendtoaddress(addr, 10**6 / 10**8 + 0.01) @@ -1406,7 +1406,7 @@ class LightningDTests(BaseLightningDTests): assert ret['id'] == l3.info['id'] - l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') self.fund_channel(l1, l2, 10**6) self.fund_channel(l2, l3, 10**6) @@ -1497,14 +1497,14 @@ class LightningDTests(BaseLightningDTests): ret = l1.rpc.connect(l2.info['id'], 'localhost', l2.info['port']) assert ret['id'] == l2.info['id'] - l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') - l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') + l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') ret = l2.rpc.connect(l3.info['id'], 'localhost', l3.info['port']) assert ret['id'] == l3.info['id'] - l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') - l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') + l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') c1 = self.fund_channel(l1, l2, 10**6) c2 = self.fund_channel(l2, l3, 10**6) @@ -1596,14 +1596,14 @@ class LightningDTests(BaseLightningDTests): ret = l1.rpc.connect(l2.info['id'], 'localhost', l2.info['port']) assert ret['id'] == l2.info['id'] - l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') - l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') + l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') ret = l2.rpc.connect(l3.info['id'], 'localhost', l3.info['port']) assert ret['id'] == l3.info['id'] - l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') - l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER') + l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') + l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER') c1 = self.fund_channel(l1, l2, 10**6) c2 = self.fund_channel(l2, l3, 10**6)