From 29c6884468face50146b6f9a6c215d52c729f25d Mon Sep 17 00:00:00 2001 From: niftynei Date: Tue, 19 Jul 2022 17:04:35 +0930 Subject: [PATCH] bkpr: add journal entry for offset account balances; report listbalances When the node starts up, it records missing/updated account balances to the 'channel' events... which is kinda fucked for wallet + external events now that i think about it but these are all treated the same anyway so it's fine. This is the magic piece that lets your bookkeeping data startup ok on an already running/established node. --- plugins/bkpr/bookkeeper.c | 114 ++++++++++++++++++++++++++++++- plugins/bkpr/recorder.c | 97 ++++++++++++++++++++++++++ plugins/bkpr/recorder.h | 13 ++++ plugins/bkpr/test/run-recorder.c | 106 ++++++++++++++++++++++++++++ 4 files changed, 328 insertions(+), 2 deletions(-) diff --git a/plugins/bkpr/bookkeeper.c b/plugins/bkpr/bookkeeper.c index 91e9ceb88..338abdf8b 100644 --- a/plugins/bkpr/bookkeeper.c +++ b/plugins/bkpr/bookkeeper.c @@ -1,7 +1,9 @@ #include "config.h" #include +#include #include #include +#include #include #include #include @@ -26,11 +28,51 @@ static struct command_result *json_list_balances(struct command *cmd, const jsmntok_t *params) { struct json_stream *res; + struct account **accts; + char *err; if (!param(cmd, buf, params, NULL)) return command_param_failed(); res = jsonrpc_stream_success(cmd); + /* List of accts */ + db_begin_transaction(db); + accts = list_accounts(cmd, db); + + json_array_start(res, "accounts"); + for (size_t i = 0; i < tal_count(accts); i++) { + struct acct_balance **balances; + + err = account_get_balance(cmd, db, + accts[i]->name, + &balances); + + if (err) + plugin_err(cmd->plugin, + "Get account balance returned err" + " for account %s: %s", + accts[i]->name, err); + + /* Add it to the result data */ + json_object_start(res, NULL); + + json_add_string(res, "account_id", accts[i]->name); + json_array_start(res, "balances"); + for (size_t j = 0; j < tal_count(balances); j++) { + json_object_start(res, NULL); + json_add_amount_msat_only(res, "balance", + balances[j]->balance); + json_add_string(res, "coin_type", + balances[j]->currency); + json_object_end(res); + } + json_array_end(res); + + json_object_end(res); + } + json_array_end(res); + db_commit_transaction(db); + return command_finished(cmd, res); } @@ -79,8 +121,13 @@ static struct command_result *json_balance_snapshot(struct command *cmd, json_tok_full(buf, params)); snaps = tal_arr(cmd, struct account_snap, accounts_tok->size); + + db_begin_transaction(db); json_for_each_arr(i, acct_tok, accounts_tok) { + struct acct_balance **balances; + struct amount_msat balance; struct account_snap s = snaps[i]; + err = json_scan(cmd, buf, acct_tok, "{account_id:%" ",balance_msat:%" @@ -99,9 +146,72 @@ static struct command_result *json_balance_snapshot(struct command *cmd, plugin_log(cmd->plugin, LOG_DBG, "account %s has balance %s", s.name, type_to_string(tmpctx, struct amount_msat, &s.amt)); - } - // FIXME: check balances are ok! + /* Find the account and verify the balance */ + err = account_get_balance(cmd, db, s.name, + &balances); + + if (err) + plugin_err(cmd->plugin, + "Get account balance returned err" + " for account %s: %s", + s.name, err); + + /* FIXME: multiple currency balances */ + balance = AMOUNT_MSAT(0); + for (size_t j = 0; j < tal_count(balances); j++) { + bool ok; + ok = amount_msat_add(&balance, balance, + balances[j]->balance); + assert(ok); + } + + if (!amount_msat_eq(s.amt, balance)) { + struct account *acct; + struct channel_event ev; + + plugin_log(cmd->plugin, LOG_UNUSUAL, + "Snapshot balance does not equal ondisk" + " reported %s, on disk %s (account %s)." + " Logging journal entry.", + type_to_string(tmpctx, struct amount_msat, &s.amt), + type_to_string(tmpctx, struct amount_msat, &balance), + s.name); + + if (!amount_msat_sub(&ev.credit, s.amt, balance)) { + ev.credit = AMOUNT_MSAT(0); + if (!amount_msat_sub(&ev.debit, balance, s.amt)) + plugin_err(cmd->plugin, + "Unable to sub amt"); + } else + ev.debit = AMOUNT_MSAT(0); + + /* Log a channel "journal entry" to get + * the balances inline */ + acct = find_account(cmd, db, s.name); + if (!acct) { + plugin_log(cmd->plugin, LOG_INFORM, + "account %s not found, adding" + " along with new balance", + s.name); + /* FIXME: lookup peer id for channel? */ + acct = new_account(cmd, s.name, NULL); + account_add(db, acct); + } + + /* This is *not* a coin_mvt tag type */ + ev.tag = "journal_entry"; + ev.fees = AMOUNT_MSAT(0); + ev.currency = s.coin_type; + ev.part_id = 0; + memset(&ev.payment_id, 0, sizeof(struct sha256)); + /* Use current time for this */ + ev.timestamp = time_now().ts.tv_sec; + + log_channel_event(db, acct, &ev); + } + } + db_commit_transaction(db); return notification_handled(cmd); } diff --git a/plugins/bkpr/recorder.c b/plugins/bkpr/recorder.c index d48e5f210..481ccfe3f 100644 --- a/plugins/bkpr/recorder.c +++ b/plugins/bkpr/recorder.c @@ -187,6 +187,103 @@ static struct chain_event *find_chain_event(const tal_t *ctx, return e; } +char *account_get_balance(const tal_t *ctx, + struct db *db, + const char *acct_name, + struct acct_balance ***balances) +{ + struct db_stmt *stmt; + + stmt = db_prepare_v2(db, SQL("SELECT" + " CAST(SUM(ce.credit) AS BIGINT) as credit" + ", CAST(SUM(ce.debit) AS BIGINT) as debit" + ", ce.currency" + " FROM chain_events ce" + " LEFT OUTER JOIN accounts a" + " ON a.id = ce.account_id" + " WHERE a.name = ?" + " GROUP BY ce.currency")); + + db_bind_text(stmt, 0, acct_name); + db_query_prepared(stmt); + *balances = tal_arr(ctx, struct acct_balance *, 0); + + while (db_step(stmt)) { + struct acct_balance *bal; + + bal = tal(*balances, struct acct_balance); + + bal->currency = db_col_strdup(bal, stmt, "ce.currency"); + db_col_amount_msat(stmt, "credit", &bal->credit); + db_col_amount_msat(stmt, "debit", &bal->debit); + tal_arr_expand(balances, bal); + } + tal_free(stmt); + + stmt = db_prepare_v2(db, SQL("SELECT" + " CAST(SUM(ce.credit) AS BIGINT) as credit" + ", CAST(SUM(ce.debit) AS BIGINT) as debit" + ", ce.currency" + " FROM channel_events ce" + " LEFT OUTER JOIN accounts a" + " ON a.id = ce.account_id" + " WHERE a.name = ?" + " GROUP BY ce.currency")); + db_bind_text(stmt, 0, acct_name); + db_query_prepared(stmt); + + while (db_step(stmt)) { + struct amount_msat amt; + struct acct_balance *bal = NULL; + char *currency; + + currency = db_col_strdup(ctx, stmt, "ce.currency"); + + /* Find the currency entry from above */ + for (size_t i = 0; i < tal_count(*balances); i++) { + if (streq((*balances)[i]->currency, currency)) { + bal = (*balances)[i]; + break; + } + } + + if (!bal) { + bal = tal(*balances, struct acct_balance); + bal->credit = AMOUNT_MSAT(0); + bal->debit = AMOUNT_MSAT(0); + bal->currency = tal_steal(bal, currency); + tal_arr_expand(balances, bal); + } + + db_col_amount_msat(stmt, "credit", &amt); + if (!amount_msat_add(&bal->credit, bal->credit, amt)) { + tal_free(stmt); + return "overflow adding channel_event credits"; + } + + db_col_amount_msat(stmt, "debit", &amt); + if (!amount_msat_add(&bal->debit, bal->debit, amt)) { + tal_free(stmt); + return "overflow adding channel_event debits"; + } + } + tal_free(stmt); + + for (size_t i = 0; i < tal_count(*balances); i++) { + struct acct_balance *bal = (*balances)[i]; + if (!amount_msat_sub(&bal->balance, bal->credit, bal->debit)) + return tal_fmt(ctx, + "%s channel balance is negative? %s - %s", + bal->currency, + type_to_string(ctx, struct amount_msat, + &bal->credit), + type_to_string(ctx, struct amount_msat, + &bal->debit)); + } + + return NULL; +} + struct channel_event **account_get_channel_events(const tal_t *ctx, struct db *db, struct account *acct) diff --git a/plugins/bkpr/recorder.h b/plugins/bkpr/recorder.h index 888c349d3..449d4775b 100644 --- a/plugins/bkpr/recorder.h +++ b/plugins/bkpr/recorder.h @@ -12,6 +12,13 @@ struct db; enum mvt_tag; struct onchain_fee; +struct acct_balance { + char *currency; + struct amount_msat credit; + struct amount_msat debit; + struct amount_msat balance; +}; + /* Get all accounts */ struct account **list_accounts(const tal_t *ctx, struct db *db); @@ -30,6 +37,12 @@ struct chain_event **account_get_chain_events(const tal_t *ctx, struct db *db, struct account *acct); +/* Calculate the balances for an account */ +char *account_get_balance(const tal_t *ctx, + struct db *db, + const char *acct_name, + struct acct_balance ***balances); + /* List all chain fees, for all accounts */ struct onchain_fee **list_chain_fees(const tal_t *ctx, struct db *db); diff --git a/plugins/bkpr/test/run-recorder.c b/plugins/bkpr/test/run-recorder.c index 7ab02b79a..01ffa6ba3 100644 --- a/plugins/bkpr/test/run-recorder.c +++ b/plugins/bkpr/test/run-recorder.c @@ -290,6 +290,25 @@ static bool chain_events_eq(struct chain_event *e1, struct chain_event *e2) return true; } +static struct channel_event *make_channel_event(const tal_t *ctx, + char *tag, + struct amount_msat credit, + struct amount_msat debit, + char payment_char) +{ + struct channel_event *ev = tal(ctx, struct channel_event); + + memset(&ev->payment_id, payment_char, sizeof(struct sha256)); + ev->credit = credit; + ev->debit = debit; + ev->fees = AMOUNT_MSAT(104); + ev->currency = "btc"; + ev->timestamp = 1919191; + ev->part_id = 19; + ev->tag = tag; + return ev; +} + static struct chain_event *make_chain_event(const tal_t *ctx, char *tag, struct amount_msat credit, @@ -827,6 +846,92 @@ static bool test_chain_event_crud(const tal_t *ctx, struct plugin *p) return true; } +static bool test_account_balances(const tal_t *ctx, struct plugin *p) +{ + struct db *db = db_setup(ctx, p, tmp_dsn(ctx)); + struct node_id peer_id; + struct account *acct, *acct2; + struct chain_event *ev1; + struct acct_balance **balances; + char *err; + + memset(&peer_id, 3, sizeof(struct node_id)); + + acct = new_account(ctx, tal_fmt(ctx, "example"), &peer_id); + acct2 = new_account(ctx, tal_fmt(ctx, "wallet"), &peer_id); + + db_begin_transaction(db); + account_add(db, acct); + account_add(db, acct2); + + /* +1000btc */ + log_chain_event(db, acct, + make_chain_event(ctx, "one", + AMOUNT_MSAT(1000), + AMOUNT_MSAT(0), + 'A', 1, '*')); + + /* -999btc */ + log_chain_event(db, acct, + make_chain_event(ctx, "two", + AMOUNT_MSAT(0), + AMOUNT_MSAT(999), + 'A', 2, '*')); + + /* -440btc */ + log_channel_event(db, acct, + make_channel_event(ctx, "chan", + AMOUNT_MSAT(0), + AMOUNT_MSAT(440), + 'C')); + + /* 500btc */ + log_channel_event(db, acct, + make_channel_event(ctx, "chan", + AMOUNT_MSAT(500), + AMOUNT_MSAT(0), + 'D')); + + /* +5000chf */ + ev1 = make_chain_event(ctx, "two", AMOUNT_MSAT(5000), AMOUNT_MSAT(0), + 'A', 3, '*'); + ev1->currency = "chf"; + log_chain_event(db, acct, ev1); + + /* Add same chain event to a different account, shouldn't show */ + log_chain_event(db, acct2, ev1); + + err = account_get_balance(ctx, db, acct->name, + &balances); + CHECK_MSG(!err, err); + db_commit_transaction(db); + CHECK_MSG(!db_err, db_err); + + /* Should have 2 balances */ + CHECK(tal_count(balances) == 2); + CHECK(streq(balances[0]->currency, "btc")); + CHECK(amount_msat_eq(balances[0]->balance, AMOUNT_MSAT(500 - 440 + 1))); + CHECK(streq(balances[1]->currency, "chf")); + CHECK(amount_msat_eq(balances[1]->balance, AMOUNT_MSAT(5000))); + + /* Should error if account balance is negative */ + db_begin_transaction(db); + /* -5001chf */ + ev1 = make_chain_event(ctx, "two", + AMOUNT_MSAT(0), AMOUNT_MSAT(5001), + 'A', 4, '*'); + ev1->currency = "chf"; + log_chain_event(db, acct, ev1); + + err = account_get_balance(ctx, db, acct->name, + &balances); + CHECK_MSG(err != NULL, "Expected err message"); + CHECK(streq(err, "chf channel balance is negative? 5000msat - 5001msat")); + db_commit_transaction(db); + + return true; +} + static bool test_account_crud(const tal_t *ctx, struct plugin *p) { struct db *db = db_setup(ctx, p, tmp_dsn(ctx)); @@ -940,6 +1045,7 @@ int main(int argc, char *argv[]) ok &= test_account_crud(tmpctx, plugin); ok &= test_channel_event_crud(tmpctx, plugin); ok &= test_chain_event_crud(tmpctx, plugin); + ok &= test_account_balances(tmpctx, plugin); ok &= test_onchain_fee_chan_close(tmpctx, plugin); ok &= test_onchain_fee_chan_open(tmpctx, plugin); ok &= test_onchain_fee_wallet_spend(tmpctx, plugin);