diff --git a/plugins/Makefile b/plugins/Makefile index 6d92bb880..fad17d248 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -218,7 +218,7 @@ plugins/sql-schema_gen.h: plugins/Makefile $(SQL_LISTRPCS_SCHEMAS) @$(call VERBOSE,GEN $@, (SEP=""; echo -n '"{'; for f in $(SQL_LISTRPCS); do echo -n "$$SEP\\\"$$f\\\":"; sed -e s/\"description\":\ *\".\*\"/\"\":\"\"/ -e s/\".*\":\ *{}/\"\":{}/ -e s/\"/\\\\\"/g < doc/schemas/$$f.schema.json | tr -d ' \n'; SEP=","; done; echo '}"') > $@) plugins/sql.o: plugins/sql-schema_gen.h -plugins/sql: $(PLUGIN_SQL_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_COMMON_OBJS) $(JSMN_OBJS) +plugins/sql: $(PLUGIN_SQL_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_COMMON_OBJS) $(JSMN_OBJS) common/gossip_store.o gossipd/gossip_store_wiregen.o # Generated from PLUGINS definition in plugins/Makefile ALL_C_HEADERS += plugins/list_of_builtin_plugins_gen.h diff --git a/plugins/sql.c b/plugins/sql.c index f2ef3a8b8..00d5a98fe 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -4,11 +4,19 @@ #include #include #include +#include #include #include #include +#include +#include +#include +#include #include #include +#include +#include +#include /* Minimized schemas. C23 #embed, Where Art Thou? */ static const char schemas[] = @@ -107,6 +115,8 @@ static STRMAP(struct table_desc *) tablemap; static size_t max_dbmem = 500000000; static struct sqlite3 *db; static const char *dbfilename; +static int gosstore_fd = -1; +static size_t gosstore_nodes_off = 0, gosstore_channels_off = 0; /* It was tempting to put these in the schema, but they're really * just for our usage. Though that would allow us to autogen the @@ -622,6 +632,285 @@ static struct command_result *default_refresh(struct command *cmd, return send_outreq(cmd->plugin, req); } +static bool extract_scid(int gosstore_fd, size_t off, u16 type, + struct short_channel_id *scid) +{ + be64 raw; + + /* BOLT #7: + * 1. type: 258 (`channel_update`) + * 2. data: + * * [`signature`:`signature`] + * * [`chain_hash`:`chain_hash`] + * * [`short_channel_id`:`short_channel_id`] + */ + /* Note that first two bytes are message type */ + const size_t update_scid_off = 2 + (64 + 32); + + off += sizeof(struct gossip_hdr); + /* For delete_chan scid immediately follows type */ + if (type == WIRE_GOSSIP_STORE_DELETE_CHAN) + off += 2; + else if (type == WIRE_GOSSIP_STORE_PRIVATE_UPDATE) + /* Prepend header */ + off += 2 + 2 + update_scid_off; + else if (type == WIRE_CHANNEL_UPDATE) + off += update_scid_off; + else + abort(); + + if (pread(gosstore_fd, &raw, sizeof(raw), off) != sizeof(raw)) + return false; + scid->u64 = be64_to_cpu(raw); + return true; +} + +/* Note: this deletes up to two rows, one for each direction. */ +static void delete_channel_from_db(struct command *cmd, + struct short_channel_id scid) +{ + int err; + char *errmsg; + + err = sqlite3_exec(db, + tal_fmt(tmpctx, + "DELETE FROM channels" + " WHERE short_channel_id = '%s'", + short_channel_id_to_str(tmpctx, &scid)), + NULL, NULL, &errmsg); + if (err != SQLITE_OK) + plugin_err(cmd->plugin, "Could not delete from channels: %s", + errmsg); +} + +static struct command_result *channels_refresh(struct command *cmd, + const struct table_desc *td, + struct db_query *dbq); + +static struct command_result *listchannels_one_done(struct command *cmd, + const char *buf, + const jsmntok_t *result, + struct db_query *dbq) +{ + const struct table_desc *td = dbq->tables[0]; + struct command_result *ret; + + ret = process_json_result(cmd, buf, result, td); + if (ret) + return ret; + + /* Continue to refresh more channels */ + return channels_refresh(cmd, td, dbq); +} + +static struct command_result *channels_refresh(struct command *cmd, + const struct table_desc *td, + struct db_query *dbq) +{ + struct out_req *req; + size_t msglen; + u16 type, flags; + + if (gosstore_fd == -1) { + gosstore_fd = open("gossip_store", O_RDONLY); + if (gosstore_fd == -1) + plugin_err(cmd->plugin, "Could not open gossip_store: %s", + strerror(errno)); + } + + /* First time, set off to end and load from scratch */ + if (gosstore_channels_off == 0) { + gosstore_channels_off = find_gossip_store_end(gosstore_fd, 1); + return default_refresh(cmd, td, dbq); + } + + plugin_log(cmd->plugin, LOG_DBG, "Refreshing channels @%zu...", + gosstore_channels_off); + + /* OK, try catching up! */ + while (gossip_store_readhdr(gosstore_fd, gosstore_channels_off, + &msglen, NULL, &flags, &type)) { + struct short_channel_id scid; + size_t off = gosstore_channels_off; + + gosstore_channels_off += sizeof(struct gossip_hdr) + msglen; + + if (flags & GOSSIP_STORE_DELETED_BIT) + continue; + + if (type == WIRE_GOSSIP_STORE_ENDED) { + /* Force a reopen */ + gosstore_channels_off = gosstore_nodes_off = 0; + close(gosstore_fd); + gosstore_fd = -1; + return channels_refresh(cmd, td, dbq); + } + + /* If we see a channel_announcement, we don't care until we + * see the channel_update */ + if (type == WIRE_CHANNEL_UPDATE + || type == WIRE_GOSSIP_STORE_PRIVATE_UPDATE) { + /* This can fail if entry not fully written yet. */ + if (!extract_scid(gosstore_fd, off, type, &scid)) { + gosstore_channels_off = off; + break; + } + + plugin_log(cmd->plugin, LOG_DBG, "Refreshing channel: %s", + type_to_string(tmpctx, struct short_channel_id, &scid)); + /* FIXME: sqlite 3.24.0 (2018-06-04) added UPSERT, but + * we don't require it. */ + delete_channel_from_db(cmd, scid); + req = jsonrpc_request_start(cmd->plugin, cmd, "listchannels", + listchannels_one_done, + forward_error, + dbq); + json_add_short_channel_id(req->js, "short_channel_id", &scid); + return send_outreq(cmd->plugin, req); + } else if (type == WIRE_GOSSIP_STORE_DELETE_CHAN) { + /* This can fail if entry not fully written yet. */ + if (!extract_scid(gosstore_fd, off, type, &scid)) { + gosstore_channels_off = off; + break; + } + plugin_log(cmd->plugin, LOG_DBG, "Deleting channel: %s", + type_to_string(tmpctx, struct short_channel_id, &scid)); + delete_channel_from_db(cmd, scid); + } + } + + return one_refresh_done(cmd, dbq); +} + +static struct command_result *nodes_refresh(struct command *cmd, + const struct table_desc *td, + struct db_query *dbq); + +static struct command_result *listnodes_one_done(struct command *cmd, + const char *buf, + const jsmntok_t *result, + struct db_query *dbq) +{ + const struct table_desc *td = dbq->tables[0]; + struct command_result *ret; + + ret = process_json_result(cmd, buf, result, td); + if (ret) + return ret; + + /* Continue to refresh more nodes */ + return nodes_refresh(cmd, td, dbq); +} + +static void delete_node_from_db(struct command *cmd, + const struct node_id *id) +{ + int err; + char *errmsg; + + err = sqlite3_exec(db, + tal_fmt(tmpctx, + "DELETE FROM nodes" + " WHERE nodeid = %s", + node_id_to_hexstr(tmpctx, id)), + NULL, NULL, &errmsg); + if (err != SQLITE_OK) + plugin_err(cmd->plugin, "Could not delete from nodes: %s", + errmsg); +} + +static bool extract_node_id(int gosstore_fd, size_t off, u16 type, + struct node_id *id) +{ + /* BOLT #7: + * 1. type: 257 (`node_announcement`) + * 2. data: + * * [`signature`:`signature`] + * * [`u16`:`flen`] + * * [`flen*byte`:`features`] + * * [`u32`:`timestamp`] + * * [`point`:`node_id`] + */ + const size_t feature_len_off = 2 + 64; + be16 flen; + size_t node_id_off; + + off += sizeof(struct gossip_hdr); + + if (pread(gosstore_fd, &flen, sizeof(flen), off + feature_len_off) + != sizeof(flen)) + return false; + + node_id_off = off + feature_len_off + 2 + flen + 4; + if (pread(gosstore_fd, id, sizeof(*id), node_id_off) != sizeof(*id)) + return false; + + return true; +} + +static struct command_result *nodes_refresh(struct command *cmd, + const struct table_desc *td, + struct db_query *dbq) +{ + struct out_req *req; + size_t msglen; + u16 type, flags; + + if (gosstore_fd == -1) { + gosstore_fd = open("gossip_store", O_RDONLY); + if (gosstore_fd == -1) + plugin_err(cmd->plugin, "Could not open gossip_store: %s", + strerror(errno)); + } + + /* First time, set off to end and load from scratch */ + if (gosstore_nodes_off == 0) { + gosstore_nodes_off = find_gossip_store_end(gosstore_fd, 1); + return default_refresh(cmd, td, dbq); + } + + /* OK, try catching up! */ + while (gossip_store_readhdr(gosstore_fd, gosstore_nodes_off, + &msglen, NULL, &flags, &type)) { + struct node_id id; + size_t off = gosstore_nodes_off; + + gosstore_nodes_off += sizeof(struct gossip_hdr) + msglen; + + if (flags & GOSSIP_STORE_DELETED_BIT) + continue; + + if (type == WIRE_GOSSIP_STORE_ENDED) { + /* Force a reopen */ + gosstore_nodes_off = gosstore_channels_off = 0; + close(gosstore_fd); + gosstore_fd = -1; + return nodes_refresh(cmd, td, dbq); + } + + if (type == WIRE_NODE_ANNOUNCEMENT) { + /* This can fail if entry not fully written yet. */ + if (!extract_node_id(gosstore_fd, off, type, &id)) { + gosstore_nodes_off = off; + break; + } + + /* FIXME: sqlite 3.24.0 (2018-06-04) added UPSERT, but + * we don't require it. */ + delete_node_from_db(cmd, &id); + req = jsonrpc_request_start(cmd->plugin, cmd, "listnodes", + listnodes_one_done, + forward_error, + dbq); + json_add_node_id(req->js, "id", &id); + return send_outreq(cmd->plugin, req); + } + /* FIXME: Add WIRE_GOSSIP_STORE_DELETE_NODE marker! */ + } + + return one_refresh_done(cmd, dbq); +} + static struct command_result *refresh_tables(struct command *cmd, struct db_query *dbq) { @@ -806,7 +1095,12 @@ static struct table_desc *new_table_desc(struct table_desc *parent, td->is_subobject = is_subobject; td->arrname = json_strdup(td, schemas, arrname); td->columns = tal_arr(td, struct column, 0); - td->refresh = default_refresh; + if (streq(td->name, "channels")) + td->refresh = channels_refresh; + else if (streq(td->name, "nodes")) + td->refresh = nodes_refresh; + else + td->refresh = default_refresh; return td; } diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 75a460e4a..484db96a0 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -3734,7 +3734,7 @@ def test_sql(node_factory, bitcoind): # And I need at least one HTLC in-flight so listpeers.channels.htlcs isn't empty: l3.rpc.plugin_start(os.path.join(os.getcwd(), 'tests/plugins/hold_invoice.py'), - holdtime=10000) + holdtime=TIMEOUT * 2) inv = l3.rpc.invoice(amount_msat=12300, label='inv3', description='description') route = l1.rpc.getroute(l3.info['id'], 12300, 1)['route'] l1.rpc.sendpay(route, inv['payment_hash'], payment_secret=inv['payment_secret']) @@ -3772,3 +3772,18 @@ def test_sql(node_factory, bitcoind): with pytest.raises(RpcError, match='Unauthorized'): l2.rpc.sql("DELETE FROM forwards;") + + assert len(l3.rpc.sql("SELECT * FROM channels;")['rows']) == 4 + # Check that channels gets refreshed! + scid = l1.get_channel_scid(l2) + l1.rpc.setchannel(scid, feebase=123) + wait_for(lambda: l3.rpc.sql("SELECT short_channel_id FROM channels WHERE base_fee_millisatoshi = 123;")['rows'] == [[scid]]) + l3.daemon.wait_for_log("Refreshing channels...") + l3.daemon.wait_for_log("Refreshing channel: {}".format(scid)) + + # This has to wait for the hold_invoice plugin to let go! + l1.rpc.close(l2.info['id']) + bitcoind.generate_block(13, wait_for_mempool=1) + wait_for(lambda: len(l3.rpc.listchannels()['channels']) == 2) + assert len(l3.rpc.sql("SELECT * FROM channels;")['rows']) == 2 + l3.daemon.wait_for_log("Deleting channel: {}".format(scid))