From 76e2c980e11462ecd8f677fa2c4a28da48bb0aff Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Thu, 26 Jan 2017 22:47:52 +0100 Subject: [PATCH] gossip: Moving to intmap-based broadcast for the legacy daemon Moved the broadcast functionality to broadcast.[ch]. So far this includes only the enqueuing side of broadcasts, the dequeuing and actual push to the peer is daemon dependent. This also adds the broadcast_state to the routing_state and the last broadcast index to the peer for the legacy daemon. --- daemon/Makefile | 2 + daemon/broadcast.c | 39 +++++++++++++++++ daemon/broadcast.h | 43 +++++++++++++++++++ daemon/p2p_announce.c | 97 ++++++++++++------------------------------- daemon/p2p_announce.h | 1 + daemon/peer.c | 1 + daemon/peer.h | 3 ++ daemon/routing.c | 1 + daemon/routing.h | 3 ++ lightningd/Makefile | 1 + 10 files changed, 121 insertions(+), 70 deletions(-) create mode 100644 daemon/broadcast.c create mode 100644 daemon/broadcast.h diff --git a/daemon/Makefile b/daemon/Makefile index e035cb04d..472292176 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -19,6 +19,7 @@ DAEMON_LIB_OBJS := $(DAEMON_LIB_SRC:.c=.o) DAEMON_SRC := \ daemon/bitcoind.c \ + daemon/broadcast.c \ daemon/chaintopology.c \ daemon/channel.c \ daemon/commit_tx.c \ @@ -66,6 +67,7 @@ DAEMON_GEN_HEADERS := \ DAEMON_HEADERS := \ daemon/bitcoind.h \ + daemon/broadcast.h \ daemon/chaintopology.h \ daemon/channel.h \ daemon/commit_tx.h \ diff --git a/daemon/broadcast.c b/daemon/broadcast.c new file mode 100644 index 000000000..e9b03d3b3 --- /dev/null +++ b/daemon/broadcast.c @@ -0,0 +1,39 @@ +#include "daemon/broadcast.h" + +struct broadcast_state *new_broadcast_state(tal_t *ctx) +{ + struct broadcast_state *bstate = tal(ctx, struct broadcast_state); + uintmap_init(&bstate->broadcasts); + /* Skip 0 because we initialize peers with 0 */ + bstate->next_index = 1; + return bstate; +} + +static struct queued_message *new_queued_message(tal_t *ctx, + const int type, + const u8 *tag, + const u8 *payload) +{ + struct queued_message *msg = tal(ctx, struct queued_message); + msg->type = type; + msg->tag = tal_dup_arr(msg, u8, tag, tal_count(tag), 0); + msg->payload = tal_dup_arr(msg, u8, payload, tal_count(payload), 0); + return msg; +} + +void queue_broadcast(struct broadcast_state *bstate, + const int type, + const u8 *tag, + const u8 *payload) +{ + struct queued_message *msg = new_queued_message(bstate, type, tag, payload); + + /*FIXME(cdecker) Walk through old messages and purge collisions */ + uintmap_add(&bstate->broadcasts, bstate->next_index, msg); + bstate->next_index += 1; +} + +struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 *last_index) +{ + return uintmap_after(&bstate->broadcasts, last_index); +} diff --git a/daemon/broadcast.h b/daemon/broadcast.h new file mode 100644 index 000000000..d9a79b5c2 --- /dev/null +++ b/daemon/broadcast.h @@ -0,0 +1,43 @@ +#ifndef LIGHTNING_DAEMON_BROADCAST_H +#define LIGHTNING_DAEMON_BROADCAST_H +#include "config.h" + +#include +#include +#include +#include + +/* Common functionality to implement staggered broadcasts with replacement. */ + +struct queued_message { + int type; + + /* Unique tag specifying the msg origin */ + void *tag; + + /* Timestamp for `channel_update`s and `node_announcement`s, 0 + * for `channel_announcement`s */ + /*u32 timestamp;*/ + + /* Serialized payload */ + u8 *payload; + + //FIXME(cdecker) Remove after migrating to intmap + struct list_node list; +}; + +struct broadcast_state { +u32 next_index; +UINTMAP(struct queued_message *) broadcasts; +}; + +struct broadcast_state *new_broadcast_state(tal_t *ctx); + +void queue_broadcast(struct broadcast_state *bstate, + const int type, + const u8 *tag, + const u8 *payload); + +struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 *last_index); + +#endif /* LIGHTNING_DAEMON_BROADCAST_H */ diff --git a/daemon/p2p_announce.c b/daemon/p2p_announce.c index 76531f285..2e040a41f 100644 --- a/daemon/p2p_announce.c +++ b/daemon/p2p_announce.c @@ -1,3 +1,4 @@ +#include "daemon/broadcast.h" #include "daemon/chaintopology.h" #include "daemon/log.h" #include "daemon/p2p_announce.h" @@ -12,62 +13,6 @@ #include #include -struct queued_message { - int type; - - /* Unique tag specifying the msg origin */ - void *tag; - - /* Timestamp for `channel_update`s and `node_announcement`s, 0 - * for `channel_announcement`s */ - u32 timestamp; - - /* Serialized payload */ - u8 *payload; - - struct list_node list; -}; - -static void broadcast(struct lightningd_state *dstate, - int type, u8 *pkt, - struct peer *origin) -{ - struct peer *p; - list_for_each(&dstate->peers, p, list) { - if (state_is_normal(p->state) && origin != p) - queue_pkt_nested(p, type, pkt); - } -} - -static void queue_broadcast(struct lightningd_state *dstate, - const int type, - const u32 timestamp, - const u8 *tag, - const u8 *payload) -{ - struct queued_message *el, *msg; - list_for_each(&dstate->broadcast_queue, el, list) { - if (el->type == type && - tal_count(tag) == tal_count(el->tag) && - memcmp(el->tag, tag, tal_count(tag)) == 0 && - el->timestamp < timestamp){ - /* Found a replacement */ - el->payload = tal_free(el->payload); - el->payload = tal_dup_arr(el, u8, payload, tal_count(payload), 0); - el->timestamp = timestamp; - return; - } - } - - /* No match found, add a new message to the queue */ - msg = tal(dstate, struct queued_message); - msg->type = type; - msg->timestamp = timestamp; - msg->tag = tal_dup_arr(msg, u8, tag, tal_count(tag), 0); - msg->payload = tal_dup_arr(msg, u8, payload, tal_count(payload), 0); - list_add_tail(&dstate->broadcast_queue, &msg->list); -} - void handle_channel_announcement( struct peer *peer, const u8 *announce, size_t len) @@ -124,8 +69,7 @@ void handle_channel_announcement( u8 *tag = tal_arr(tmpctx, u8, 0); towire_channel_id(&tag, &channel_id); - queue_broadcast(peer->dstate, WIRE_CHANNEL_ANNOUNCEMENT, - 0, /* `channel_announcement`s do not have a timestamp */ + queue_broadcast(peer->dstate->rstate->broadcasts, WIRE_CHANNEL_ANNOUNCEMENT, tag, serialized); tal_free(tmpctx); @@ -194,9 +138,8 @@ void handle_channel_update(struct peer *peer, const u8 *update, size_t len) u8 *tag = tal_arr(tmpctx, u8, 0); towire_channel_id(&tag, &channel_id); - queue_broadcast(peer->dstate, + queue_broadcast(peer->dstate->rstate->broadcasts, WIRE_CHANNEL_UPDATE, - timestamp, tag, serialized); @@ -265,9 +208,8 @@ void handle_node_announcement( u8 *tag = tal_arr(tmpctx, u8, 0); towire_pubkey(&tag, &node_id); - queue_broadcast(peer->dstate, + queue_broadcast(peer->dstate->rstate->broadcasts, WIRE_NODE_ANNOUNCEMENT, - timestamp, tag, serialized); tal_free(node->node_announcement); @@ -309,8 +251,9 @@ static void broadcast_channel_update(struct lightningd_state *dstate, struct pee 1, dstate->config.fee_base, dstate->config.fee_per_satoshi); - - broadcast(dstate, WIRE_CHANNEL_UPDATE, serialized, NULL); + u8 *tag = tal_arr(tmpctx, u8, 0); + towire_channel_id(&tag, &channel_id); + queue_broadcast(dstate->rstate->broadcasts, WIRE_CHANNEL_UPDATE, tag, serialized); tal_free(tmpctx); } @@ -346,7 +289,10 @@ static void broadcast_node_announcement(struct lightningd_state *dstate) &dstate->id, rgb_color, alias, NULL, address); - broadcast(dstate, WIRE_NODE_ANNOUNCEMENT, serialized, NULL); + u8 *tag = tal_arr(tmpctx, u8, 0); + towire_pubkey(&tag, &dstate->id); + queue_broadcast(dstate->rstate->broadcasts, WIRE_NODE_ANNOUNCEMENT, tag, + serialized); tal_free(tmpctx); } @@ -422,7 +368,10 @@ static void broadcast_channel_announcement(struct lightningd_state *dstate, stru bitcoin_key[0], bitcoin_key[1], NULL); - broadcast(dstate, WIRE_CHANNEL_ANNOUNCEMENT, serialized, NULL); + u8 *tag = tal_arr(tmpctx, u8, 0); + towire_channel_id(&tag, &channel_id); + queue_broadcast(dstate->rstate->broadcasts, WIRE_CHANNEL_ANNOUNCEMENT, + tag, serialized); tal_free(tmpctx); } @@ -456,11 +405,19 @@ void announce_channel(struct lightningd_state *dstate, struct peer *peer) static void process_broadcast_queue(struct lightningd_state *dstate) { + struct peer *p; + struct queued_message *msg; new_reltimer(dstate, dstate, time_from_sec(30), process_broadcast_queue, dstate); - struct queued_message *el; - while ((el = list_pop(&dstate->broadcast_queue, struct queued_message, list)) != NULL) { - broadcast(dstate, el->type, el->payload, NULL); - tal_free(el); + list_for_each(&dstate->peers, p, list) { + if (!state_is_normal(p->state)) + continue; + msg = next_broadcast_message(dstate->rstate->broadcasts, + &p->broadcast_index); + while (msg != NULL) { + queue_pkt_nested(p, msg->type, msg->payload); + msg = next_broadcast_message(dstate->rstate->broadcasts, + &p->broadcast_index); + } } } diff --git a/daemon/p2p_announce.h b/daemon/p2p_announce.h index f0ab557bc..a3737b41d 100644 --- a/daemon/p2p_announce.h +++ b/daemon/p2p_announce.h @@ -1,6 +1,7 @@ #ifndef LIGHTNING_DAEMON_P2P_ANNOUNCE_H #define LIGHTNING_DAEMON_P2P_ANNOUNCE_H #include "config.h" +#include "daemon/broadcast.h" #include "daemon/lightningd.h" #include "daemon/routing.h" #include "lightningd.h" diff --git a/daemon/peer.c b/daemon/peer.c index 4da4debbf..a5069833b 100644 --- a/daemon/peer.c +++ b/daemon/peer.c @@ -2772,6 +2772,7 @@ struct peer *new_peer(struct lightningd_state *dstate, peer->fake_close = false; peer->output_enabled = true; peer->local.offer_anchor = offer_anchor; + peer->broadcast_index = 0; if (!blocks_to_rel_locktime(dstate->config.locktime_blocks, &peer->local.locktime)) fatal("Could not convert locktime_blocks"); diff --git a/daemon/peer.h b/daemon/peer.h index 7b3098cf9..086ff75ae 100644 --- a/daemon/peer.h +++ b/daemon/peer.h @@ -231,6 +231,9 @@ struct peer { /* this is where we will store their revocation preimages*/ struct shachain their_preimages; + + /* High water mark for the staggered broadcast */ + u64 broadcast_index; }; /* Mapping for id -> network address. */ diff --git a/daemon/routing.c b/daemon/routing.c index cc3886b42..a9647e65f 100644 --- a/daemon/routing.c +++ b/daemon/routing.c @@ -21,6 +21,7 @@ struct routing_state *new_routing_state(const tal_t *ctx, struct log *base_log) struct routing_state *rstate = tal(ctx, struct routing_state); rstate->base_log = base_log; rstate->nodes = empty_node_map(rstate); + rstate->broadcasts = new_broadcast_state(rstate); return rstate; } diff --git a/daemon/routing.h b/daemon/routing.h index bb494d6ff..a37f69836 100644 --- a/daemon/routing.h +++ b/daemon/routing.h @@ -2,6 +2,7 @@ #define LIGHTNING_DAEMON_ROUTING_H #include "config.h" #include "bitcoin/pubkey.h" +#include "daemon/broadcast.h" #include "wire/wire.h" #include @@ -83,6 +84,8 @@ struct routing_state { struct node_map *nodes; struct log *base_log; + + struct broadcast_state *broadcasts; }; //FIXME(cdecker) The log will have to be replaced for the new subdaemon, keeping for now to keep changes small. diff --git a/lightningd/Makefile b/lightningd/Makefile index 53a75ec7a..e6eeb1d69 100644 --- a/lightningd/Makefile +++ b/lightningd/Makefile @@ -9,6 +9,7 @@ lightningd-all: lightningd/lightningd lightningd/lightningd_hsm lightningd/light default: lightningd-all LIGHTNINGD_OLD_SRC := \ + daemon/broadcast.c \ daemon/configdir.c \ daemon/dns.c \ daemon/netaddr.c \