From 531c82b6adc76bd86fde8747e5237dc082883e44 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 4 Jun 2018 13:56:25 +0930 Subject: [PATCH] gossipd: handle gossip_timestamp_filter message. And initialize filter (to "never") when we negotiated LOCAL_GOSSIP_QUERIES, and send initial filter message. Signed-off-by: Rusty Russell --- gossipd/broadcast.c | 12 ++-- gossipd/broadcast.h | 7 +- gossipd/gossip.c | 159 +++++++++++++++++++++++++++++++++----------- 3 files changed, 132 insertions(+), 46 deletions(-) diff --git a/gossipd/broadcast.c b/gossipd/broadcast.c index 196772c4f..9695ee87a 100644 --- a/gossipd/broadcast.c +++ b/gossipd/broadcast.c @@ -51,12 +51,16 @@ void insert_broadcast(struct broadcast_state *bstate, bstate->next_index++); } -const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index) +const u8 *next_broadcast(struct broadcast_state *bstate, + u32 timestamp_min, u32 timestamp_max, + u64 *last_index) { struct queued_message *m; - m = uintmap_after(&bstate->broadcasts, last_index); - if (m) - return m->payload; + while ((m = uintmap_after(&bstate->broadcasts, last_index)) != NULL) { + if (m->timestamp >= timestamp_min + && m->timestamp <= timestamp_max) + return m->payload; + } return NULL; } diff --git a/gossipd/broadcast.h b/gossipd/broadcast.h index e728b681f..6de0fec38 100644 --- a/gossipd/broadcast.h +++ b/gossipd/broadcast.h @@ -20,7 +20,10 @@ struct broadcast_state *new_broadcast_state(tal_t *ctx); void insert_broadcast(struct broadcast_state *bstate, const u8 *msg, u32 timestamp); -/* Return the broadcast with index >= *last_index, and update *last_index. +/* Return the broadcast with index >= *last_index, timestamp >= min and <= max + * and update *last_index. * There's no broadcast with index 0. */ -const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index); +const u8 *next_broadcast(struct broadcast_state *bstate, + u32 timestamp_min, u32 timestamp_max, + u64 *last_index); #endif /* LIGHTNING_GOSSIPD_BROADCAST_H */ diff --git a/gossipd/gossip.c b/gossipd/gossip.c index 31b9e5f94..57cc0dc80 100644 --- a/gossipd/gossip.c +++ b/gossipd/gossip.c @@ -217,6 +217,9 @@ struct peer { /* High water mark for the staggered broadcast */ u64 broadcast_index; + /* Timestamp range to filter gossip by */ + u32 gossip_timestamp_min, gossip_timestamp_max; + /* Are there outstanding queries on short_channel_ids? */ const struct short_channel_id *scid_queries; size_t scid_query_idx; @@ -352,6 +355,8 @@ static struct peer *new_peer(const tal_t *ctx, peer->scid_query_nodes = NULL; peer->scid_query_nodes_idx = 0; peer->num_scid_queries_outstanding = 0; + peer->gossip_timestamp_min = 0; + peer->gossip_timestamp_max = UINT32_MAX; return peer; } @@ -493,6 +498,25 @@ static struct io_plan *retry_peer_connected(struct io_conn *conn, return peer_connected(conn, peer); } +static void setup_gossip_range(struct peer *peer) +{ + bool gossip_queries; + u8 *msg; + + gossip_queries = feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES) + && feature_offered(peer->daemon->localfeatures, + LOCAL_GOSSIP_QUERIES); + + if (!gossip_queries) + return; + + /* Tell it to start gossip! (And give us everything!) */ + msg = towire_gossip_timestamp_filter(peer, + &peer->daemon->rstate->chain_hash, + 0, UINT32_MAX); + queue_peer_msg(peer, take(msg)); +} + static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer) { struct peer *old_peer; @@ -523,16 +547,34 @@ static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer) /* BOLT #7: * - * Upon receiving an `init` message with the `initial_routing_sync` - * flag set the node sends `channel_announcement`s, `channel_update`s - * and `node_announcement`s for all known channels and nodes as if - * they were just received. + * - if the `gossip_queries` feature is negotiated: + * - MUST NOT relay any gossip messages unless explicitly requested. */ - if (feature_offered(peer->lfeatures, LOCAL_INITIAL_ROUTING_SYNC)) - peer->broadcast_index = 0; - else - peer->broadcast_index - = peer->daemon->rstate->broadcasts->next_index; + if (feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES) + && feature_offered(peer->daemon->localfeatures, LOCAL_GOSSIP_QUERIES)) { + peer->broadcast_index = UINT64_MAX; + /* Nothing in this range */ + peer->gossip_timestamp_min = UINT32_MAX; + peer->gossip_timestamp_max = 0; + } else { + /* BOLT #7: + * + * - upon receiving an `init` message with the + * `initial_routing_sync` flag set to 1: + * - SHOULD send `channel_announcement`s, `channel_update`s + * and `node_announcement`s for all known channels and + * nodes, as if they were just received. + * - if the `initial_routing_sync` flag is set to 0, OR if the + * initial sync was completed: + * - SHOULD resume normal operation, as specified in the + * following [Rebroadcasting](#rebroadcasting) section. + */ + if (feature_offered(peer->lfeatures, LOCAL_INITIAL_ROUTING_SYNC)) + peer->broadcast_index = 0; + else + peer->broadcast_index + = peer->daemon->rstate->broadcasts->next_index; + } /* This is a full peer now; we keep it around until master says * it's dead. */ @@ -548,6 +590,7 @@ static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer) /* Start the gossip flowing. */ wake_pkt_out(peer); + setup_gossip_range(peer); return io_close_taken_fd(conn); } @@ -821,6 +864,40 @@ static void handle_query_short_channel_ids(struct peer *peer, u8 *msg) msg_wake(&peer->remote->out); } +static void handle_gossip_timestamp_filter(struct peer *peer, u8 *msg) +{ + struct bitcoin_blkid chain_hash; + u32 first_timestamp, timestamp_range; + + if (!fromwire_gossip_timestamp_filter(msg, &chain_hash, + &first_timestamp, + ×tamp_range)) { + peer_error(peer, "Bad gossip_timestamp_filter %s", + tal_hex(tmpctx, msg)); + return; + } + + if (!structeq(&peer->daemon->rstate->chain_hash, &chain_hash)) { + status_trace("%s sent gossip_timestamp_filter chainhash %s", + type_to_string(tmpctx, struct pubkey, &peer->id), + type_to_string(tmpctx, struct bitcoin_blkid, + &chain_hash)); + return; + } + + /* First time, start gossip sync immediately. */ + if (peer->gossip_timestamp_min > peer->gossip_timestamp_max) + wake_pkt_out(peer); + + /* FIXME: We don't index by timestamp, so this forces a brute + * search! */ + peer->gossip_timestamp_min = first_timestamp; + peer->gossip_timestamp_max = first_timestamp + timestamp_range - 1; + if (peer->gossip_timestamp_max < peer->gossip_timestamp_min) + peer->gossip_timestamp_max = UINT32_MAX; + peer->broadcast_index = 0; +} + static void handle_ping(struct peer *peer, u8 *ping) { u8 *pong; @@ -963,6 +1040,10 @@ static struct io_plan *peer_msgin(struct io_conn *conn, handle_reply_short_channel_ids_end(peer, msg); return peer_next_in(conn, peer); + case WIRE_GOSSIP_TIMESTAMP_FILTER: + handle_gossip_timestamp_filter(peer, msg); + return peer_next_in(conn, peer); + case WIRE_OPEN_CHANNEL: case WIRE_CHANNEL_REESTABLISH: case WIRE_ACCEPT_CHANNEL: @@ -989,7 +1070,6 @@ static struct io_plan *peer_msgin(struct io_conn *conn, case WIRE_QUERY_CHANNEL_RANGE: case WIRE_REPLY_CHANNEL_RANGE: - case WIRE_GOSSIP_TIMESTAMP_FILTER: /* FIXME: Implement */ return peer_next_in(conn, peer); } @@ -1161,6 +1241,29 @@ static bool create_next_scid_reply(struct peer *peer) return sent; } +/* If we're supposed to be sending gossip, do so now. */ +static bool maybe_queue_gossip(struct peer *peer) +{ + const u8 *next; + + if (!peer->gossip_sync) + return false; + + next = next_broadcast(peer->daemon->rstate->broadcasts, + peer->gossip_timestamp_min, + peer->gossip_timestamp_max, + &peer->broadcast_index); + + if (next) { + queue_peer_msg(peer, next); + return true; + } + + /* Gossip is drained. Wait for next timer. */ + peer->gossip_sync = false; + return false; +} + static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer) { /* First priority is queued packets, if any */ @@ -1183,20 +1286,8 @@ again: return ready_for_master(conn, peer); } else if (create_next_scid_reply(peer)) { goto again; - } else if (peer->gossip_sync) { - /* If we're supposed to be sending gossip, do so now. */ - const u8 *next; - - next = next_broadcast(peer->daemon->rstate->broadcasts, - &peer->broadcast_index); - - if (next) - return peer_write_message(conn, &peer->local->pcs, - next, - peer_pkt_out); - - /* Gossip is drained. Wait for next timer. */ - peer->gossip_sync = false; + } else if (maybe_queue_gossip(peer)) { + goto again; } return msg_queue_wait(conn, &peer->local->peer_out, peer_pkt_out, peer); @@ -1397,6 +1488,8 @@ static struct io_plan *owner_msg_in(struct io_conn *conn, handle_query_short_channel_ids(peer, dc->msg_in); } else if (type == WIRE_REPLY_SHORT_CHANNEL_IDS_END) { handle_reply_short_channel_ids_end(peer, dc->msg_in); + } else if (type == WIRE_GOSSIP_TIMESTAMP_FILTER) { + handle_gossip_timestamp_filter(peer, dc->msg_in); } else if (type == WIRE_GOSSIP_GET_UPDATE) { handle_get_update(peer, dc->msg_in); } else if (type == WIRE_GOSSIP_LOCAL_ADD_CHANNEL) { @@ -1465,7 +1558,6 @@ static bool send_peer_with_fds(struct peer *peer, const u8 *msg) */ static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc) { - const u8 *next; struct peer *peer = dc->ctx; /* Make sure we are not connected directly */ @@ -1475,21 +1567,8 @@ static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc) if (create_next_scid_reply(peer)) return true; - /* Nothing to do if we're not gossiping */ - if (!peer->gossip_sync) - return false; - - next = next_broadcast(peer->daemon->rstate->broadcasts, - &peer->broadcast_index); - - if (!next) { - peer->gossip_sync = false; - return false; - } else { - u8 *msg = towire_gossip_send_gossip(NULL, next); - daemon_conn_send(peer->remote, take(msg)); - return true; - } + /* Otherwise queue any gossip we want to send */ + return maybe_queue_gossip(peer); } static struct io_plan *new_peer_got_fd(struct io_conn *conn, struct peer *peer)