gossipd: don't send gossip stream, let per-peer daemons read it themselves.

Keeping the uintmap ordering all the broadcastable messages is expensive:
130MB for the million-channels project.  But now we delete obsolete entries
from the store, we can have the per-peer daemons simply read that sequentially
and stream the gossip itself.

This is the most primitive version, where all gossip is streamed;
successive patches will bring back proper handling of timestamp filtering
and initial_routing_sync.

We add a gossip_state field to track what's happening with our gossip
streaming: it's initialized in gossipd, and currently always set, but
once we handle timestamps the per-peer daemon may do it when the first
filter is sent.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2019-06-04 03:45:25 +09:30
parent a40f45af55
commit 5591c0b5d8
24 changed files with 374 additions and 266 deletions

View File

@@ -1,43 +1,119 @@
#include <assert.h>
#include <ccan/crc/crc.h>
#include <ccan/endian/endian.h>
#include <common/gossip_store.h>
#include <common/per_peer_state.h>
#include <common/status.h>
#include <common/utils.h>
#include <errno.h>
#include <inttypes.h>
#include <unistd.h>
#include <wire/gen_peer_wire.h>
u8 *gossip_store_read(const tal_t *ctx, int gossip_store_fd, u64 offset)
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
{
beint32_t hdr[2];
u32 msglen, checksum;
u8 *msg;
u8 *msg = NULL;
if (offset == 0)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't access offset %"PRIu64,
offset);
if (pread(gossip_store_fd, hdr, sizeof(hdr), offset) != sizeof(hdr)) {
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't read hdr offset %"PRIu64
": %s",
offset, strerror(errno));
/* Don't read until we're initialized. */
if (!pps->gs)
return NULL;
while (!msg) {
beint32_t hdr[2];
u32 msglen, checksum;
int type;
if (read(pps->gossip_store_fd, hdr, sizeof(hdr)) != sizeof(hdr)) {
per_peer_state_reset_gossip_timer(pps);
return NULL;
}
/* Skip any deleted entries. */
if (be32_to_cpu(hdr[0]) & GOSSIP_STORE_LEN_DELETED_BIT) {
/* Skip over it. */
lseek(pps->gossip_store_fd,
be32_to_cpu(hdr[0]) & ~GOSSIP_STORE_LEN_DELETED_BIT,
SEEK_CUR);
continue;
}
msglen = be32_to_cpu(hdr[0]);
checksum = be32_to_cpu(hdr[1]);
msg = tal_arr(ctx, u8, msglen);
if (read(pps->gossip_store_fd, msg, msglen) != msglen)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't read len %u"
" ~offset %"PRIi64,
msglen,
(s64)lseek(pps->gossip_store_fd,
0, SEEK_CUR));
if (checksum != crc32c(0, msg, msglen))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: bad checksum offset %"
PRIi64": %s",
(s64)lseek(pps->gossip_store_fd,
0, SEEK_CUR) - msglen,
tal_hex(tmpctx, msg));
/* Ignore gossipd internal messages. */
type = fromwire_peektype(msg);
if (type != WIRE_CHANNEL_ANNOUNCEMENT
&& type != WIRE_CHANNEL_UPDATE
&& type != WIRE_NODE_ANNOUNCEMENT)
msg = tal_free(msg);
}
/* FIXME: We should skip over these deleted entries! */
msglen = be32_to_cpu(hdr[0]) & ~GOSSIP_STORE_LEN_DELETED_BIT;
checksum = be32_to_cpu(hdr[1]);
msg = tal_arr(ctx, u8, msglen);
if (pread(gossip_store_fd, msg, msglen, offset + sizeof(hdr)) != msglen)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: can't read len %u offset %"PRIu64,
msglen, offset);
if (checksum != crc32c(0, msg, msglen))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: bad checksum offset %"PRIu64": %s",
offset, tal_hex(tmpctx, msg));
return msg;
}
/* newfd is at offset 1. We need to adjust it to similar offset as our
* current one. */
void gossip_store_switch_fd(struct per_peer_state *pps,
int newfd, u64 offset_shorter)
{
u64 cur = lseek(pps->gossip_store_fd, SEEK_CUR, 0);
/* If we're already at end (common), we know where to go in new one. */
if (cur == lseek(pps->gossip_store_fd, SEEK_END, 0)) {
status_debug("gossip_store at end, new fd moved to %"PRIu64,
cur - offset_shorter);
assert(cur > offset_shorter);
lseek(newfd, cur - offset_shorter, SEEK_SET);
} else if (cur > offset_shorter) {
/* We're part way through. Worst case, we should move back by
* offset_shorter (that's how much the *end* moved), but in
* practice we'll probably end up retransmitting some stuff */
u64 target = cur - offset_shorter;
size_t num = 0;
status_debug("gossip_store new fd moving back %"PRIu64
" to %"PRIu64,
cur, target);
cur = 1;
while (cur < target) {
u32 msglen;
beint32_t hdr[2];
if (read(newfd, hdr, sizeof(hdr)) != sizeof(hdr))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: "
"can't read hdr offset %"PRIu64
" in new store target %"PRIu64,
cur, target);
/* Skip over it. */
msglen = (be32_to_cpu(hdr[0])
& ~GOSSIP_STORE_LEN_DELETED_BIT);
cur = lseek(newfd, msglen, SEEK_CUR);
num++;
}
status_debug("gossip_store: skipped %zu records to %"PRIu64,
num, cur);
} else
status_debug("gossip_store new fd moving back %"PRIu64
" to start (offset_shorter=%"PRIu64")",
cur, offset_shorter);
close(pps->gossip_store_fd);
pps->gossip_store_fd = newfd;
}

View File

@@ -4,6 +4,8 @@
#include <ccan/short_types/short_types.h>
#include <ccan/tal/tal.h>
struct per_peer_state;
/**
* gossip_store -- On-disk storage related information
*/
@@ -17,8 +19,15 @@
/**
* Direct store accessor: loads gossip msg from store.
*
* Doesn't return; status_failed() on error.
* Returns NULL and resets time_to_next_gossip(pps) if there are no
* more gossip msgs.
*/
u8 *gossip_store_read(const tal_t *ctx, int gossip_store_fd, u64 offset);
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps);
/**
* Switches the gossip store fd, and gets to the correct offset.
*/
void gossip_store_switch_fd(struct per_peer_state *pps,
int newfd, u64 offset_shorter);
#endif /* LIGHTNING_COMMON_GOSSIP_STORE_H */

View File

@@ -2,6 +2,7 @@
#include <ccan/fdpass/fdpass.h>
#include <common/per_peer_state.h>
#include <unistd.h>
#include <wire/wire.h>
static void destroy_per_peer_state(struct per_peer_state *pps)
{
@@ -19,6 +20,7 @@ struct per_peer_state *new_per_peer_state(const tal_t *ctx,
struct per_peer_state *pps = tal(ctx, struct per_peer_state);
pps->cs = *cs;
pps->gs = NULL;
pps->peer_fd = pps->gossip_fd = pps->gossip_store_fd = -1;
tal_add_destructor(pps, destroy_per_peer_state);
return pps;
@@ -42,9 +44,28 @@ void per_peer_state_set_fds_arr(struct per_peer_state *pps, const int *fds)
per_peer_state_set_fds(pps, fds[0], fds[1], fds[2]);
}
void towire_gossip_state(u8 **pptr, const struct gossip_state *gs)
{
towire_u64(pptr, gs->next_gossip.ts.tv_sec);
towire_u64(pptr, gs->next_gossip.ts.tv_nsec);
}
void fromwire_gossip_state(const u8 **cursor, size_t *max,
struct gossip_state *gs)
{
gs->next_gossip.ts.tv_sec = fromwire_u64(cursor, max);
gs->next_gossip.ts.tv_nsec = fromwire_u64(cursor, max);
}
void towire_per_peer_state(u8 **pptr, const struct per_peer_state *pps)
{
towire_crypto_state(pptr, &pps->cs);
#if DEVELOPER
towire_u32(pptr, pps->dev_gossip_broadcast_msec);
#endif
towire_bool(pptr, pps->gs != NULL);
if (pps->gs)
towire_gossip_state(pptr, pps->gs);
}
void per_peer_state_fdpass_send(int fd, const struct per_peer_state *pps)
@@ -61,7 +82,56 @@ struct per_peer_state *fromwire_per_peer_state(const tal_t *ctx,
const u8 **cursor, size_t *max)
{
struct crypto_state cs;
struct per_peer_state *pps;
fromwire_crypto_state(cursor, max, &cs);
return new_per_peer_state(ctx, &cs);
pps = new_per_peer_state(ctx, &cs);
#if DEVELOPER
pps->dev_gossip_broadcast_msec = fromwire_u32(cursor, max);
#endif
if (fromwire_bool(cursor, max)) {
pps->gs = tal(pps, struct gossip_state);
fromwire_gossip_state(cursor, max, pps->gs);
}
return pps;
}
/* FIXME: Put in ccan/time */
/* Is a after b? */
static inline bool timemono_after(struct timemono a, struct timemono b)
{
return time_greater_(a.ts, b.ts);
}
bool time_to_next_gossip(const struct per_peer_state *pps,
struct timerel *t)
{
if (!pps->gs)
return false;
struct timemono now = time_mono();
if (timemono_after(now, pps->gs->next_gossip))
*t = time_from_sec(0);
else
*t = timemono_between(pps->gs->next_gossip, now);
return true;
}
/* BOLT #7:
*
* A node:
*...
* - SHOULD flush outgoing gossip messages once every 60 seconds,
* independently of the arrival times of the messages.
* - Note: this results in staggered announcements that are unique
* (not duplicated).
*/
void per_peer_state_reset_gossip_timer(struct per_peer_state *pps)
{
struct timerel t = time_from_sec(60);
#if DEVELOPER
t = time_from_msec(pps->dev_gossip_broadcast_msec);
#endif
pps->gs->next_gossip = timemono_add(time_mono(), t);
}

View File

@@ -3,19 +3,31 @@
#include "config.h"
#include <ccan/tal/tal.h>
#include <ccan/time/time.h>
#include <common/crypto_state.h>
struct gossip_state {
/* Time for next gossip burst. */
struct timemono next_gossip;
};
/* Things we hand between daemons to talk to peers. */
struct per_peer_state {
/* Cryptographic state needed to exchange messages with the peer (as
* featured in BOLT #8) */
struct crypto_state cs;
/* NULL if it's not initialized yet */
struct gossip_state *gs;
#if DEVELOPER
/* Normally 60000, but adjustable for dev mode */
u32 dev_gossip_broadcast_msec;
#endif /* DEVELOPER */
/* If not -1, closed on freeing */
int peer_fd, gossip_fd, gossip_store_fd;
};
/* Allocate a new per-peer state and add destructor to close fds if set;
* sets fds to -1. */
* sets fds to -1 and ->gs to NULL.. */
struct per_peer_state *new_per_peer_state(const tal_t *ctx,
const struct crypto_state *cs);
@@ -33,4 +45,15 @@ void per_peer_state_fdpass_send(int fd, const struct per_peer_state *pps);
struct per_peer_state *fromwire_per_peer_state(const tal_t *ctx,
const u8 **cursor, size_t *max);
void towire_gossip_state(u8 **pptr, const struct gossip_state *gs);
void fromwire_gossip_state(const u8 **cursor, size_t *max,
struct gossip_state *gs);
/* How long until we have to check gossip store, if any? */
bool time_to_next_gossip(const struct per_peer_state *pps,
struct timerel *t);
/* Reset pps->next_gossip now we've drained gossip_store */
void per_peer_state_reset_gossip_timer(struct per_peer_state *pps);
#endif /* LIGHTNING_COMMON_PER_PEER_STATE_H */

View File

@@ -22,13 +22,32 @@ u8 *peer_or_gossip_sync_read(const tal_t *ctx,
fd_set readfds;
u8 *msg;
FD_ZERO(&readfds);
FD_SET(pps->peer_fd, &readfds);
FD_SET(pps->gossip_fd, &readfds);
for (;;) {
struct timeval tv, *tptr;
struct timerel trel;
select(pps->peer_fd > pps->gossip_fd
? pps->peer_fd + 1 : pps->gossip_fd + 1,
&readfds, NULL, NULL, NULL);
if (time_to_next_gossip(pps, &trel)) {
tv = timerel_to_timeval(trel);
tptr = &tv;
} else
tptr = NULL;
FD_ZERO(&readfds);
FD_SET(pps->peer_fd, &readfds);
FD_SET(pps->gossip_fd, &readfds);
if (select(pps->peer_fd > pps->gossip_fd
? pps->peer_fd + 1 : pps->gossip_fd + 1,
&readfds, NULL, NULL, tptr) != 0)
break;
/* We timed out; look in gossip_store. Failure resets timer. */
msg = gossip_store_next(tmpctx, pps);
if (msg) {
*from_gossipd = true;
return msg;
}
}
if (FD_ISSET(pps->peer_fd, &readfds)) {
msg = sync_crypto_read(ctx, pps);
@@ -84,23 +103,16 @@ bool is_wrong_channel(const u8 *msg, const struct channel_id *expected,
return !channel_id_eq(expected, actual);
}
static void new_gossip_store(struct per_peer_state *pps, int new_gossip_store_fd)
{
close(pps->gossip_store_fd);
pps->gossip_store_fd = new_gossip_store_fd;
}
void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES)
{
u8 *gossip;
u64 offset;
u64 offset_shorter;
if (fromwire_gossipd_new_store_fd(msg)) {
new_gossip_store(pps, fdpass_recv(pps->gossip_fd));
if (fromwire_gossipd_new_store_fd(msg, &offset_shorter)) {
gossip_store_switch_fd(pps, fdpass_recv(pps->gossip_fd),
offset_shorter);
goto out;
} else if (fromwire_gossipd_send_gossip_from_store(msg, &offset))
gossip = gossip_store_read(tmpctx, pps->gossip_store_fd, offset);
else
} else
/* It's a raw gossip msg: this copies or takes() */
gossip = tal_dup_arr(tmpctx, u8, msg, tal_bytelen(msg), 0);
@@ -113,7 +125,7 @@ void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES)
peer_failed_connection_lost();
} else {
status_broken("Gossipd gave us bad send_gossip message %s",
tal_hex(msg, msg));
tal_hex(tmpctx, gossip));
peer_failed_connection_lost();
}