connectd: put more stuff into struct gossip_state.

We're the only ones who use it now, so put our fields inside it and
make it local.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-01-11 11:45:43 +10:30
parent 407a89a400
commit 6d4c56e8b6
6 changed files with 118 additions and 152 deletions

View File

@@ -11,7 +11,8 @@
#include <unistd.h>
#include <wire/peer_wire.h>
static bool timestamp_filter(const struct gossip_state *gs, u32 timestamp)
static bool timestamp_filter(u32 timestamp_min, u32 timestamp_max,
u32 timestamp)
{
/* BOLT #7:
*
@@ -20,25 +21,11 @@ static bool timestamp_filter(const struct gossip_state *gs, u32 timestamp)
* `timestamp_range`.
*/
/* Note that we turn first_timestamp & timestamp_range into an inclusive range */
return timestamp >= gs->timestamp_min
&& timestamp <= gs->timestamp_max;
return timestamp >= timestamp_min
&& timestamp <= timestamp_max;
}
/* Not all the data we expected was there: rewind file */
static void failed_read(int fd, int len)
{
if (len < 0) {
/* Grab errno before lseek overrides it */
const char *err = strerror(errno);
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: failed read @%"PRIu64": %s",
(u64)lseek(fd, 0, SEEK_CUR), err);
}
lseek(fd, -len, SEEK_CUR);
}
static void reopen_gossip_store(int *gossip_store_fd, const u8 *msg)
static size_t reopen_gossip_store(int *gossip_store_fd, const u8 *msg)
{
u64 equivalent_offset;
int newfd;
@@ -57,17 +44,16 @@ static void reopen_gossip_store(int *gossip_store_fd, const u8 *msg)
status_debug("gossip_store at end, new fd moved to %"PRIu64,
equivalent_offset);
lseek(newfd, equivalent_offset, SEEK_SET);
close(*gossip_store_fd);
*gossip_store_fd = newfd;
return equivalent_offset;
}
u8 *gossip_store_iter(const tal_t *ctx,
u8 *gossip_store_next(const tal_t *ctx,
int *gossip_store_fd,
struct gossip_state *gs,
struct gossip_rcvd_filter *grf,
size_t *off)
u32 timestamp_min, u32 timestamp_max,
size_t *off, size_t *end)
{
u8 *msg = NULL;
@@ -77,16 +63,9 @@ u8 *gossip_store_iter(const tal_t *ctx,
bool push;
int type, r;
if (off)
r = pread(*gossip_store_fd, &hdr, sizeof(hdr), *off);
else
r = read(*gossip_store_fd, &hdr, sizeof(hdr));
if (r != sizeof(hdr)) {
/* We expect a 0 read here at EOF */
if (r != 0 && off)
failed_read(*gossip_store_fd, r);
r = pread(*gossip_store_fd, &hdr, sizeof(hdr), *off);
if (r != sizeof(hdr))
return NULL;
}
msglen = be32_to_cpu(hdr.len);
push = (msglen & GOSSIP_STORE_LEN_PUSH_BIT);
@@ -94,56 +73,42 @@ u8 *gossip_store_iter(const tal_t *ctx,
/* Skip any deleted entries. */
if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
/* Skip over it. */
if (off)
*off += r + msglen;
else
lseek(*gossip_store_fd, msglen, SEEK_CUR);
*off += r + msglen;
continue;
}
checksum = be32_to_cpu(hdr.crc);
timestamp = be32_to_cpu(hdr.timestamp);
msg = tal_arr(ctx, u8, msglen);
if (off)
r = pread(*gossip_store_fd, msg, msglen, *off + r);
else
r = read(*gossip_store_fd, msg, msglen);
if (r != msglen) {
if (!off)
failed_read(*gossip_store_fd, r);
r = pread(*gossip_store_fd, msg, msglen, *off + r);
if (r != msglen)
return NULL;
}
if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: bad checksum offset %"
PRIi64": %s",
off ? (s64)*off :
(s64)lseek(*gossip_store_fd,
0, SEEK_CUR) - msglen,
tal_hex(tmpctx, msg));
"gossip_store: bad checksum offset %zu"
": %s",
*off, tal_hex(tmpctx, msg));
/* Definitely processing it now */
if (off)
*off += sizeof(hdr) + msglen;
/* Don't send back gossip they sent to us! */
if (gossip_rcvd_filter_del(grf, msg)) {
msg = tal_free(msg);
continue;
}
*off += sizeof(hdr) + msglen;
if (*off > *end)
*end = *off;
type = fromwire_peektype(msg);
if (type == WIRE_GOSSIP_STORE_ENDED)
reopen_gossip_store(gossip_store_fd, msg);
/* end can go backwards in this case! */
if (type == WIRE_GOSSIP_STORE_ENDED) {
*off = *end = reopen_gossip_store(gossip_store_fd, msg);
/* Ignore gossipd internal messages. */
else if (type != WIRE_CHANNEL_ANNOUNCEMENT
&& type != WIRE_CHANNEL_UPDATE
&& type != WIRE_NODE_ANNOUNCEMENT)
} else if (type != WIRE_CHANNEL_ANNOUNCEMENT
&& type != WIRE_CHANNEL_UPDATE
&& type != WIRE_NODE_ANNOUNCEMENT) {
msg = tal_free(msg);
else if (!push && !timestamp_filter(gs, timestamp))
} else if (!push &&
!timestamp_filter(timestamp_min, timestamp_max,
timestamp)) {
msg = tal_free(msg);
}
}
return msg;

View File

@@ -40,12 +40,13 @@ struct gossip_hdr {
* Direct store accessor: loads gossip msg from store.
*
* Returns NULL if there are no more gossip msgs.
* Updates *end if the known end of file has moved.
* Updates *gossip_store_fd if file has been compacted.
*/
u8 *gossip_store_iter(const tal_t *ctx,
u8 *gossip_store_next(const tal_t *ctx,
int *gossip_store_fd,
struct gossip_state *gs,
struct gossip_rcvd_filter *grf,
size_t *off);
u32 timestamp_min, u32 timestamp_max,
size_t *off, size_t *end);
/**
* Gossipd will be writing to this, and it's not atomic! Safest

View File

@@ -6,13 +6,6 @@
#include <ccan/time/time.h>
#include <common/crypto_state.h>
struct gossip_state {
/* Time for next gossip burst. */
struct timemono next_gossip;
/* Timestamp filtering for gossip. */
u32 timestamp_min, timestamp_max;
};
/* Things we hand between daemons to talk to peers. */
struct per_peer_state {
/* If not -1, closed on freeing */

View File

@@ -357,7 +357,6 @@ static struct peer *new_peer(struct daemon *daemon,
peer->urgent = false;
peer->peer_outq = msg_queue_new(peer);
peer->subd_outq = msg_queue_new(peer);
peer->grf = new_gossip_rcvd_filter(peer);
/* Aim for connection to shuffle data back and forth: sets up
* peer->to_subd */
@@ -368,7 +367,6 @@ static struct peer *new_peer(struct daemon *daemon,
peer_htable_add(&daemon->peers, peer);
tal_add_destructor2(peer, destroy_peer, daemon);
peer->gs = NULL;
return peer;
}

View File

@@ -13,6 +13,20 @@ struct io_conn;
struct connecting;
struct wireaddr_internal;
/*~ All the gossip_store related fields are kept together for convenience. */
struct gossip_state {
/* Is it active right now? */
bool active;
/* Except with dev override, this fires every 60 seconds */
struct oneshot *gossip_timer;
/* Timestamp filtering for gossip. */
u32 timestamp_min, timestamp_max;
/* I think this is called "echo cancellation" */
struct gossip_rcvd_filter *grf;
/* Offset within the gossip_store file */
size_t off;
};
/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are
* already connected (by peer->id). */
struct peer {
@@ -45,13 +59,8 @@ struct peer {
/* Peer sent buffer (for freeing after sending) */
const u8 *sent_to_peer;
/* Gossip store. */
struct gossip_state *gs;
/* FIXME: move into gs. */
struct gossip_rcvd_filter *grf;
size_t gossip_store_off;
struct oneshot *gossip_timer;
/* We stream from the gossip_store for them, when idle */
struct gossip_state gs;
};
/*~ The HTABLE_DEFINE_TYPE() macro needs a keyof() function to extract the key:

View File

@@ -51,41 +51,28 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
va_end(ap);
}
/* Either for initial setup, or when they ask by timestamp */
static bool setup_gossip_filter(struct peer *peer,
u32 first_timestamp,
u32 timestamp_range)
{
bool immediate_sync;
/* Kicks off write_to_peer() to look for more gossip to send from store */
static void wake_gossip(struct peer *peer);
/* If this is the first filter, we gossip sync immediately. */
if (!peer->gs) {
peer->gs = tal(peer, struct gossip_state);
peer->gs->next_gossip = time_mono();
immediate_sync = true;
} else
immediate_sync = false;
static struct oneshot *gossip_stream_timer(struct peer *peer)
{
u32 next;
/* BOLT #7:
*
* The receiver:
* - SHOULD send all gossip messages whose `timestamp` is greater or
* equal to `first_timestamp`, and less than `first_timestamp` plus
* `timestamp_range`.
* - MAY wait for the next outgoing gossip flush to send these.
* ...
* - SHOULD restrict future gossip messages to those whose `timestamp`
* is greater or equal to `first_timestamp`, and less than
* `first_timestamp` plus `timestamp_range`.
* A node:
*...
* - SHOULD flush outgoing gossip messages once every 60 seconds,
* independently of the arrival times of the messages.
* - Note: this results in staggered announcements that are unique
* (not duplicated).
*/
peer->gs->timestamp_min = first_timestamp;
peer->gs->timestamp_max = first_timestamp + timestamp_range - 1;
/* Make sure we never leave it on an impossible value. */
if (peer->gs->timestamp_max < peer->gs->timestamp_min)
peer->gs->timestamp_max = UINT32_MAX;
/* We shorten this for dev_fast_gossip! */
next = GOSSIP_FLUSH_INTERVAL(peer->daemon->dev_fast_gossip);
peer->gossip_store_off = 1;
return immediate_sync;
return new_reltimer(&peer->daemon->timers,
peer, time_from_sec(next),
wake_gossip, peer);
}
/* This is called once we need it: otherwise, the gossip_store may not exist,
@@ -111,7 +98,7 @@ void setup_peer_gossip_store(struct peer *peer,
if (peer->daemon->gossip_store_fd == -1)
setup_gossip_store(peer->daemon);
peer->gossip_timer = NULL;
peer->gs.grf = new_gossip_rcvd_filter(peer);
/* BOLT #7:
*
@@ -120,10 +107,17 @@ void setup_peer_gossip_store(struct peer *peer,
* - MUST NOT relay any gossip messages it did not generate itself,
* unless explicitly requested.
*/
if (feature_negotiated(our_features, their_features, OPT_GOSSIP_QUERIES))
if (feature_negotiated(our_features, their_features, OPT_GOSSIP_QUERIES)) {
peer->gs.gossip_timer = NULL;
peer->gs.active = false;
peer->gs.off = 1;
return;
}
setup_gossip_filter(peer, 0, UINT32_MAX);
peer->gs.gossip_timer = gossip_stream_timer(peer);
peer->gs.active = true;
peer->gs.timestamp_min = 0;
peer->gs.timestamp_max = UINT32_MAX;
/* BOLT #7:
*
@@ -136,10 +130,12 @@ void setup_peer_gossip_store(struct peer *peer,
* - SHOULD resume normal operation, as specified in the
* following [Rebroadcasting](#rebroadcasting) section.
*/
if (!feature_offered(their_features, OPT_INITIAL_ROUTING_SYNC)) {
if (feature_offered(their_features, OPT_INITIAL_ROUTING_SYNC))
peer->gs.off = 1;
else {
/* During tests, particularly, we find that the gossip_store
* moves fast, so make sure it really does start at the end. */
peer->gossip_store_off
peer->gs.off
= find_gossip_store_end(peer->daemon->gossip_store_fd,
peer->daemon->gossip_store_end);
}
@@ -316,8 +312,11 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
/* Kicks off write_to_peer() to look for more gossip to send from store */
static void wake_gossip(struct peer *peer)
{
peer->gossip_timer = NULL;
peer->gs.active = true;
io_wake(peer->peer_outq);
/* And go again in 60 seconds (from now, now when we finish!) */
peer->gs.gossip_timer = gossip_stream_timer(peer);
}
/* If we are streaming gossip, get something from gossip store */
@@ -326,43 +325,32 @@ static u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
u8 *msg;
/* Not streaming yet? */
if (!peer->gs)
if (!peer->gs.active)
return NULL;
/* Still waiting for timer? */
if (peer->gossip_timer != NULL)
return NULL;
msg = gossip_store_iter(ctx, &peer->daemon->gossip_store_fd,
peer->gs, peer->grf, &peer->gossip_store_off);
/* Cache highest valid offset (FIXME: doesn't really work when
* gossip_store gets rewritten!) */
if (peer->gossip_store_off > peer->daemon->gossip_store_end)
peer->daemon->gossip_store_end = peer->gossip_store_off;
/* This should be around to kick us every 60 seconds */
assert(peer->gs.gossip_timer);
again:
msg = gossip_store_next(ctx, &peer->daemon->gossip_store_fd,
peer->gs.timestamp_min,
peer->gs.timestamp_max,
&peer->gs.off,
&peer->daemon->gossip_store_end);
/* Don't send back gossip they sent to us! */
if (msg) {
status_peer_debug(&peer->id,
"Sending gossip %s",
peer_wire_name(fromwire_peektype(msg)));
if (gossip_rcvd_filter_del(peer->gs.grf, msg)) {
msg = tal_free(msg);
goto again;
}
status_peer_io(LOG_IO_OUT, &peer->id, msg);
return msg;
}
/* BOLT #7:
*
* A node:
*...
* - SHOULD flush outgoing gossip messages once every 60 seconds,
* independently of the arrival times of the messages.
* - Note: this results in staggered announcements that are unique
* (not duplicated).
*/
/* We do 60 seconds from *start*, not from *now* */
peer->gs->next_gossip
= timemono_add(time_mono(),
time_from_sec(GOSSIP_FLUSH_INTERVAL(
peer->daemon->dev_fast_gossip)));
peer->gossip_timer = new_abstimer(&peer->daemon->timers, peer,
peer->gs->next_gossip,
wake_gossip, peer);
peer->gs.active = false;
return NULL;
}
@@ -374,7 +362,7 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg)
/* We remember these so we don't rexmit them */
if (is_msg_gossip_broadcast(msg))
gossip_rcvd_filter_add(peer->grf, msg);
gossip_rcvd_filter_add(peer->gs.grf, msg);
if (!fromwire_gossip_timestamp_filter(msg, &chain_hash,
&first_timestamp,
@@ -388,9 +376,21 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg)
return true;
}
/* Returns true the first time. */
if (setup_gossip_filter(peer, first_timestamp, timestamp_range))
peer->gs.timestamp_min = first_timestamp;
peer->gs.timestamp_max = first_timestamp + timestamp_range - 1;
/* Make sure we never leave it on an impossible value. */
if (peer->gs.timestamp_max < peer->gs.timestamp_min)
peer->gs.timestamp_max = UINT32_MAX;
peer->gs.off = 1;
/* BOLT #7:
* - MAY wait for the next outgoing gossip flush to send these.
*/
/* We send immediately the first time, after that we wait. */
if (!peer->gs.gossip_timer)
wake_gossip(peer);
return true;
}