diff --git a/common/gossip_store.c b/common/gossip_store.c index 3456a483e..9056dee01 100644 --- a/common/gossip_store.c +++ b/common/gossip_store.c @@ -11,7 +11,8 @@ #include #include -static bool timestamp_filter(const struct gossip_state *gs, u32 timestamp) +static bool timestamp_filter(u32 timestamp_min, u32 timestamp_max, + u32 timestamp) { /* BOLT #7: * @@ -20,25 +21,11 @@ static bool timestamp_filter(const struct gossip_state *gs, u32 timestamp) * `timestamp_range`. */ /* Note that we turn first_timestamp & timestamp_range into an inclusive range */ - return timestamp >= gs->timestamp_min - && timestamp <= gs->timestamp_max; + return timestamp >= timestamp_min + && timestamp <= timestamp_max; } -/* Not all the data we expected was there: rewind file */ -static void failed_read(int fd, int len) -{ - if (len < 0) { - /* Grab errno before lseek overrides it */ - const char *err = strerror(errno); - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "gossip_store: failed read @%"PRIu64": %s", - (u64)lseek(fd, 0, SEEK_CUR), err); - } - - lseek(fd, -len, SEEK_CUR); -} - -static void reopen_gossip_store(int *gossip_store_fd, const u8 *msg) +static size_t reopen_gossip_store(int *gossip_store_fd, const u8 *msg) { u64 equivalent_offset; int newfd; @@ -57,17 +44,16 @@ static void reopen_gossip_store(int *gossip_store_fd, const u8 *msg) status_debug("gossip_store at end, new fd moved to %"PRIu64, equivalent_offset); - lseek(newfd, equivalent_offset, SEEK_SET); close(*gossip_store_fd); *gossip_store_fd = newfd; + return equivalent_offset; } -u8 *gossip_store_iter(const tal_t *ctx, +u8 *gossip_store_next(const tal_t *ctx, int *gossip_store_fd, - struct gossip_state *gs, - struct gossip_rcvd_filter *grf, - size_t *off) + u32 timestamp_min, u32 timestamp_max, + size_t *off, size_t *end) { u8 *msg = NULL; @@ -77,16 +63,9 @@ u8 *gossip_store_iter(const tal_t *ctx, bool push; int type, r; - if (off) - r = pread(*gossip_store_fd, &hdr, sizeof(hdr), *off); - else - r = read(*gossip_store_fd, &hdr, sizeof(hdr)); - if (r != sizeof(hdr)) { - /* We expect a 0 read here at EOF */ - if (r != 0 && off) - failed_read(*gossip_store_fd, r); + r = pread(*gossip_store_fd, &hdr, sizeof(hdr), *off); + if (r != sizeof(hdr)) return NULL; - } msglen = be32_to_cpu(hdr.len); push = (msglen & GOSSIP_STORE_LEN_PUSH_BIT); @@ -94,56 +73,42 @@ u8 *gossip_store_iter(const tal_t *ctx, /* Skip any deleted entries. */ if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) { - /* Skip over it. */ - if (off) - *off += r + msglen; - else - lseek(*gossip_store_fd, msglen, SEEK_CUR); + *off += r + msglen; continue; } checksum = be32_to_cpu(hdr.crc); timestamp = be32_to_cpu(hdr.timestamp); msg = tal_arr(ctx, u8, msglen); - if (off) - r = pread(*gossip_store_fd, msg, msglen, *off + r); - else - r = read(*gossip_store_fd, msg, msglen); - if (r != msglen) { - if (!off) - failed_read(*gossip_store_fd, r); + r = pread(*gossip_store_fd, msg, msglen, *off + r); + if (r != msglen) return NULL; - } if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen)) status_failed(STATUS_FAIL_INTERNAL_ERROR, - "gossip_store: bad checksum offset %" - PRIi64": %s", - off ? (s64)*off : - (s64)lseek(*gossip_store_fd, - 0, SEEK_CUR) - msglen, - tal_hex(tmpctx, msg)); + "gossip_store: bad checksum offset %zu" + ": %s", + *off, tal_hex(tmpctx, msg)); /* Definitely processing it now */ - if (off) - *off += sizeof(hdr) + msglen; - - /* Don't send back gossip they sent to us! */ - if (gossip_rcvd_filter_del(grf, msg)) { - msg = tal_free(msg); - continue; - } + *off += sizeof(hdr) + msglen; + if (*off > *end) + *end = *off; type = fromwire_peektype(msg); - if (type == WIRE_GOSSIP_STORE_ENDED) - reopen_gossip_store(gossip_store_fd, msg); + /* end can go backwards in this case! */ + if (type == WIRE_GOSSIP_STORE_ENDED) { + *off = *end = reopen_gossip_store(gossip_store_fd, msg); /* Ignore gossipd internal messages. */ - else if (type != WIRE_CHANNEL_ANNOUNCEMENT - && type != WIRE_CHANNEL_UPDATE - && type != WIRE_NODE_ANNOUNCEMENT) + } else if (type != WIRE_CHANNEL_ANNOUNCEMENT + && type != WIRE_CHANNEL_UPDATE + && type != WIRE_NODE_ANNOUNCEMENT) { msg = tal_free(msg); - else if (!push && !timestamp_filter(gs, timestamp)) + } else if (!push && + !timestamp_filter(timestamp_min, timestamp_max, + timestamp)) { msg = tal_free(msg); + } } return msg; diff --git a/common/gossip_store.h b/common/gossip_store.h index e1d760860..b5d8fad19 100644 --- a/common/gossip_store.h +++ b/common/gossip_store.h @@ -40,12 +40,13 @@ struct gossip_hdr { * Direct store accessor: loads gossip msg from store. * * Returns NULL if there are no more gossip msgs. + * Updates *end if the known end of file has moved. + * Updates *gossip_store_fd if file has been compacted. */ -u8 *gossip_store_iter(const tal_t *ctx, +u8 *gossip_store_next(const tal_t *ctx, int *gossip_store_fd, - struct gossip_state *gs, - struct gossip_rcvd_filter *grf, - size_t *off); + u32 timestamp_min, u32 timestamp_max, + size_t *off, size_t *end); /** * Gossipd will be writing to this, and it's not atomic! Safest diff --git a/common/per_peer_state.h b/common/per_peer_state.h index a81360ab3..af41d95f0 100644 --- a/common/per_peer_state.h +++ b/common/per_peer_state.h @@ -6,13 +6,6 @@ #include #include -struct gossip_state { - /* Time for next gossip burst. */ - struct timemono next_gossip; - /* Timestamp filtering for gossip. */ - u32 timestamp_min, timestamp_max; -}; - /* Things we hand between daemons to talk to peers. */ struct per_peer_state { /* If not -1, closed on freeing */ diff --git a/connectd/connectd.c b/connectd/connectd.c index fc02c3242..9aab4c9d2 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -357,7 +357,6 @@ static struct peer *new_peer(struct daemon *daemon, peer->urgent = false; peer->peer_outq = msg_queue_new(peer); peer->subd_outq = msg_queue_new(peer); - peer->grf = new_gossip_rcvd_filter(peer); /* Aim for connection to shuffle data back and forth: sets up * peer->to_subd */ @@ -368,7 +367,6 @@ static struct peer *new_peer(struct daemon *daemon, peer_htable_add(&daemon->peers, peer); tal_add_destructor2(peer, destroy_peer, daemon); - peer->gs = NULL; return peer; } diff --git a/connectd/connectd.h b/connectd/connectd.h index 09c79bd18..ff4057538 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -13,6 +13,20 @@ struct io_conn; struct connecting; struct wireaddr_internal; +/*~ All the gossip_store related fields are kept together for convenience. */ +struct gossip_state { + /* Is it active right now? */ + bool active; + /* Except with dev override, this fires every 60 seconds */ + struct oneshot *gossip_timer; + /* Timestamp filtering for gossip. */ + u32 timestamp_min, timestamp_max; + /* I think this is called "echo cancellation" */ + struct gossip_rcvd_filter *grf; + /* Offset within the gossip_store file */ + size_t off; +}; + /*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are * already connected (by peer->id). */ struct peer { @@ -45,13 +59,8 @@ struct peer { /* Peer sent buffer (for freeing after sending) */ const u8 *sent_to_peer; - /* Gossip store. */ - struct gossip_state *gs; - /* FIXME: move into gs. */ - struct gossip_rcvd_filter *grf; - size_t gossip_store_off; - - struct oneshot *gossip_timer; + /* We stream from the gossip_store for them, when idle */ + struct gossip_state gs; }; /*~ The HTABLE_DEFINE_TYPE() macro needs a keyof() function to extract the key: diff --git a/connectd/multiplex.c b/connectd/multiplex.c index d14119f2e..ed3b8eda0 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -51,41 +51,28 @@ static void send_warning(struct peer *peer, const char *fmt, ...) va_end(ap); } -/* Either for initial setup, or when they ask by timestamp */ -static bool setup_gossip_filter(struct peer *peer, - u32 first_timestamp, - u32 timestamp_range) -{ - bool immediate_sync; +/* Kicks off write_to_peer() to look for more gossip to send from store */ +static void wake_gossip(struct peer *peer); - /* If this is the first filter, we gossip sync immediately. */ - if (!peer->gs) { - peer->gs = tal(peer, struct gossip_state); - peer->gs->next_gossip = time_mono(); - immediate_sync = true; - } else - immediate_sync = false; +static struct oneshot *gossip_stream_timer(struct peer *peer) +{ + u32 next; /* BOLT #7: * - * The receiver: - * - SHOULD send all gossip messages whose `timestamp` is greater or - * equal to `first_timestamp`, and less than `first_timestamp` plus - * `timestamp_range`. - * - MAY wait for the next outgoing gossip flush to send these. - * ... - * - SHOULD restrict future gossip messages to those whose `timestamp` - * is greater or equal to `first_timestamp`, and less than - * `first_timestamp` plus `timestamp_range`. + * A node: + *... + * - SHOULD flush outgoing gossip messages once every 60 seconds, + * independently of the arrival times of the messages. + * - Note: this results in staggered announcements that are unique + * (not duplicated). */ - peer->gs->timestamp_min = first_timestamp; - peer->gs->timestamp_max = first_timestamp + timestamp_range - 1; - /* Make sure we never leave it on an impossible value. */ - if (peer->gs->timestamp_max < peer->gs->timestamp_min) - peer->gs->timestamp_max = UINT32_MAX; + /* We shorten this for dev_fast_gossip! */ + next = GOSSIP_FLUSH_INTERVAL(peer->daemon->dev_fast_gossip); - peer->gossip_store_off = 1; - return immediate_sync; + return new_reltimer(&peer->daemon->timers, + peer, time_from_sec(next), + wake_gossip, peer); } /* This is called once we need it: otherwise, the gossip_store may not exist, @@ -111,7 +98,7 @@ void setup_peer_gossip_store(struct peer *peer, if (peer->daemon->gossip_store_fd == -1) setup_gossip_store(peer->daemon); - peer->gossip_timer = NULL; + peer->gs.grf = new_gossip_rcvd_filter(peer); /* BOLT #7: * @@ -120,10 +107,17 @@ void setup_peer_gossip_store(struct peer *peer, * - MUST NOT relay any gossip messages it did not generate itself, * unless explicitly requested. */ - if (feature_negotiated(our_features, their_features, OPT_GOSSIP_QUERIES)) + if (feature_negotiated(our_features, their_features, OPT_GOSSIP_QUERIES)) { + peer->gs.gossip_timer = NULL; + peer->gs.active = false; + peer->gs.off = 1; return; + } - setup_gossip_filter(peer, 0, UINT32_MAX); + peer->gs.gossip_timer = gossip_stream_timer(peer); + peer->gs.active = true; + peer->gs.timestamp_min = 0; + peer->gs.timestamp_max = UINT32_MAX; /* BOLT #7: * @@ -136,10 +130,12 @@ void setup_peer_gossip_store(struct peer *peer, * - SHOULD resume normal operation, as specified in the * following [Rebroadcasting](#rebroadcasting) section. */ - if (!feature_offered(their_features, OPT_INITIAL_ROUTING_SYNC)) { + if (feature_offered(their_features, OPT_INITIAL_ROUTING_SYNC)) + peer->gs.off = 1; + else { /* During tests, particularly, we find that the gossip_store * moves fast, so make sure it really does start at the end. */ - peer->gossip_store_off + peer->gs.off = find_gossip_store_end(peer->daemon->gossip_store_fd, peer->daemon->gossip_store_end); } @@ -316,8 +312,11 @@ static struct io_plan *encrypt_and_send(struct peer *peer, /* Kicks off write_to_peer() to look for more gossip to send from store */ static void wake_gossip(struct peer *peer) { - peer->gossip_timer = NULL; + peer->gs.active = true; io_wake(peer->peer_outq); + + /* And go again in 60 seconds (from now, now when we finish!) */ + peer->gs.gossip_timer = gossip_stream_timer(peer); } /* If we are streaming gossip, get something from gossip store */ @@ -326,43 +325,32 @@ static u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer) u8 *msg; /* Not streaming yet? */ - if (!peer->gs) + if (!peer->gs.active) return NULL; - /* Still waiting for timer? */ - if (peer->gossip_timer != NULL) - return NULL; - - msg = gossip_store_iter(ctx, &peer->daemon->gossip_store_fd, - peer->gs, peer->grf, &peer->gossip_store_off); - - /* Cache highest valid offset (FIXME: doesn't really work when - * gossip_store gets rewritten!) */ - if (peer->gossip_store_off > peer->daemon->gossip_store_end) - peer->daemon->gossip_store_end = peer->gossip_store_off; + /* This should be around to kick us every 60 seconds */ + assert(peer->gs.gossip_timer); +again: + msg = gossip_store_next(ctx, &peer->daemon->gossip_store_fd, + peer->gs.timestamp_min, + peer->gs.timestamp_max, + &peer->gs.off, + &peer->daemon->gossip_store_end); + /* Don't send back gossip they sent to us! */ if (msg) { + status_peer_debug(&peer->id, + "Sending gossip %s", + peer_wire_name(fromwire_peektype(msg))); + if (gossip_rcvd_filter_del(peer->gs.grf, msg)) { + msg = tal_free(msg); + goto again; + } status_peer_io(LOG_IO_OUT, &peer->id, msg); return msg; } - /* BOLT #7: - * - * A node: - *... - * - SHOULD flush outgoing gossip messages once every 60 seconds, - * independently of the arrival times of the messages. - * - Note: this results in staggered announcements that are unique - * (not duplicated). - */ - /* We do 60 seconds from *start*, not from *now* */ - peer->gs->next_gossip - = timemono_add(time_mono(), - time_from_sec(GOSSIP_FLUSH_INTERVAL( - peer->daemon->dev_fast_gossip))); - peer->gossip_timer = new_abstimer(&peer->daemon->timers, peer, - peer->gs->next_gossip, - wake_gossip, peer); + peer->gs.active = false; return NULL; } @@ -374,7 +362,7 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg) /* We remember these so we don't rexmit them */ if (is_msg_gossip_broadcast(msg)) - gossip_rcvd_filter_add(peer->grf, msg); + gossip_rcvd_filter_add(peer->gs.grf, msg); if (!fromwire_gossip_timestamp_filter(msg, &chain_hash, &first_timestamp, @@ -388,9 +376,21 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg) return true; } - /* Returns true the first time. */ - if (setup_gossip_filter(peer, first_timestamp, timestamp_range)) + peer->gs.timestamp_min = first_timestamp; + peer->gs.timestamp_max = first_timestamp + timestamp_range - 1; + /* Make sure we never leave it on an impossible value. */ + if (peer->gs.timestamp_max < peer->gs.timestamp_min) + peer->gs.timestamp_max = UINT32_MAX; + + peer->gs.off = 1; + + /* BOLT #7: + * - MAY wait for the next outgoing gossip flush to send these. + */ + /* We send immediately the first time, after that we wait. */ + if (!peer->gs.gossip_timer) wake_gossip(peer); + return true; }