bkpr: have onchain_fee records be write-only, don't update in place

One really rough thing about how we did onchain fees is that the records update
every time a new event comes in.

The better way to do this is to create new entries for every adjustment,
so that reconciliation between printouts isn't a misery.

We add a timestamp and `update_count` to these records, so you can
roughly order them now (and have a good idea of the last time an event
that updated an onchain_fee occurred).
This commit is contained in:
niftynei
2022-07-19 17:04:35 +09:30
committed by Rusty Russell
parent 29c6884468
commit 899d54edd0
4 changed files with 162 additions and 88 deletions

View File

@@ -84,9 +84,12 @@ static struct migration db_migrations[] = {
{SQL("CREATE TABLE onchain_fees ("
"account_id BIGINT REFERENCES accounts(id)"
", txid BLOB"
", amount BIGINT"
", credit BIGINT"
", debit BIGINT"
", currency TEXT"
", PRIMARY KEY (account_id, txid)"
", timestamp BIGINT"
", update_count INT"
", PRIMARY KEY (account_id, txid, update_count)"
");"),
NULL},
};

View File

@@ -15,11 +15,18 @@ struct onchain_fee {
/* Transaction that we're recording fees for */
struct bitcoin_txid txid;
/* Total amount of onchain fees we paid for this txid */
struct amount_msat amount;
/* Incremental change in onchain fees */
struct amount_msat credit;
struct amount_msat debit;
/* What token are fees? */
char *currency;
/* Timestamp of the event that created this fee update */
u64 timestamp;
/* Count of records we've recorded for this tx */
u32 update_count;
};
#endif /* LIGHTNING_PLUGINS_BKPR_ONCHAIN_FEE_H */

View File

@@ -325,8 +325,11 @@ static struct onchain_fee *stmt2onchain_fee(const tal_t *ctx,
of->acct_db_id = db_col_u64(stmt, "account_id");
db_col_txid(stmt, "txid", &of->txid);
db_col_amount_msat(stmt, "amount", &of->amount);
db_col_amount_msat(stmt, "credit", &of->credit);
db_col_amount_msat(stmt, "debit", &of->debit);
of->currency = db_col_strdup(of, stmt, "currency");
of->timestamp = db_col_u64(stmt, "timestamp");
of->update_count = db_col_int(stmt, "update_count");
return of;
}
@@ -339,10 +342,15 @@ struct onchain_fee **list_chain_fees(const tal_t *ctx, struct db *db)
stmt = db_prepare_v2(db, SQL("SELECT"
" account_id"
", txid"
", amount"
", credit"
", debit"
", currency"
", timestamp"
", update_count"
" FROM onchain_fees"
" ORDER BY account_id"));
" ORDER BY account_id"
", txid"
", update_count"));
db_query_prepared(stmt);
results = tal_arr(ctx, struct onchain_fee *, 0);
@@ -434,8 +442,11 @@ struct onchain_fee **account_onchain_fees(const tal_t *ctx,
stmt = db_prepare_v2(db, SQL("SELECT"
" account_id"
", txid"
", amount"
", credit"
", debit"
", currency"
", timestamp"
", update_count"
" FROM onchain_fees"
" WHERE account_id = ?;"));
@@ -678,58 +689,81 @@ static u64 find_acct_id(struct db *db, const char *name)
return acct_id;
}
static void update_or_insert_chain_fees(struct db *db,
u64 acct_id,
struct bitcoin_txid *txid,
struct amount_msat *amount,
const char *currency)
static void insert_chain_fees_diff(struct db *db,
u64 acct_id,
struct bitcoin_txid *txid,
struct amount_msat amount,
const char *currency,
u64 timestamp)
{
struct db_stmt *stmt;
u32 update_count;
struct amount_msat current_amt, credit, debit;
/* First, look to see if there's an already existing
* record to update */
stmt = db_prepare_v2(db, SQL("SELECT"
" 1"
" update_count"
", credit"
", debit"
" FROM onchain_fees"
" WHERE txid = ?"
" AND account_id = ?"));
" AND account_id = ?"
" ORDER BY update_count"));
db_bind_txid(stmt, 0, txid);
db_bind_u64(stmt, 1, acct_id);
db_query_prepared(stmt);
/* If there's no current record, add it */
if (!db_step(stmt)) {
tal_free(stmt);
current_amt = AMOUNT_MSAT(0);
update_count = 0;
while (db_step(stmt)) {
update_count = db_col_int(stmt, "update_count");
db_col_amount_msat(stmt, "credit", &credit);
db_col_amount_msat(stmt, "debit", &debit);
stmt = db_prepare_v2(db, SQL("INSERT INTO onchain_fees"
" ("
" account_id"
", txid"
", amount"
", currency"
") VALUES"
" (?, ?, ?, ?);"));
/* These should apply perfectly, as we sorted them by
* insert order */
if (!amount_msat_add(&current_amt, current_amt, credit))
db_fatal("Overflow when adding onchain fees");
if (!amount_msat_sub(&current_amt, current_amt, debit))
db_fatal("Underflow when subtracting onchain fees");
db_bind_u64(stmt, 0, acct_id);
db_bind_txid(stmt, 1, txid);
db_bind_amount_msat(stmt, 2, amount);
db_bind_text(stmt, 3, currency);
db_exec_prepared_v2(take(stmt));
return;
}
/* Otherwise, we update the existing record */
db_col_ignore(stmt, "1");
tal_free(stmt);
stmt = db_prepare_v2(db, SQL("UPDATE onchain_fees SET"
" amount = ?"
" WHERE txid = ?"
" AND account_id = ?"));
db_bind_amount_msat(stmt, 0, amount);
/* If they're already equal, no need to update */
if (amount_msat_eq(current_amt, amount))
return;
if (!amount_msat_sub(&credit, amount, current_amt)) {
credit = AMOUNT_MSAT(0);
if (!amount_msat_sub(&debit, current_amt, amount))
db_fatal("shouldn't happen, unable to subtract");
} else
debit = AMOUNT_MSAT(0);
stmt = db_prepare_v2(db, SQL("INSERT INTO onchain_fees"
" ("
" account_id"
", txid"
", credit"
", debit"
", currency"
", timestamp"
", update_count"
") VALUES"
" (?, ?, ?, ?, ?, ?, ?);"));
db_bind_u64(stmt, 0, acct_id);
db_bind_txid(stmt, 1, txid);
db_bind_u64(stmt, 2, acct_id);
db_bind_amount_msat(stmt, 2, &credit);
db_bind_amount_msat(stmt, 3, &debit);
db_bind_text(stmt, 4, currency);
db_bind_u64(stmt, 5, timestamp);
db_bind_int(stmt, 6, ++update_count);
db_exec_prepared_v2(take(stmt));
}
@@ -872,9 +906,9 @@ char *maybe_update_onchain_fees(const tal_t *ctx, struct db *db,
fees = fee_part_msat;
/* FIXME: fee_currency property of acct? */
update_or_insert_chain_fees(db, last_id,
txid, &fees,
events[i]->currency);
insert_chain_fees_diff(db, last_id, txid, fees,
events[i]->currency,
events[i]->timestamp);
}

View File

@@ -454,7 +454,9 @@ static bool test_onchain_fee_chan_close(const tal_t *ctx, struct plugin *p)
CHECK(tal_count(ofs) == 1);
CHECK(ofs[0]->acct_db_id == acct->db_id);
CHECK(amount_msat_eq(ofs[0]->amount, AMOUNT_MSAT(800)));
CHECK(amount_msat_eq(ofs[0]->credit, AMOUNT_MSAT(800)));
CHECK(amount_msat_zero(ofs[0]->debit));
CHECK(ofs[0]->update_count == 1);
log_chain_event(db, acct,
make_chain_event(ctx, "htlc_tx",
@@ -491,7 +493,7 @@ static bool test_onchain_fee_chan_close(const tal_t *ctx, struct plugin *p)
db_commit_transaction(db);
CHECK_MSG(!db_err, db_err);
/* Expect: 2 onchain fee records, all for chan-1 */
/* Expect: 3 onchain fee records, all for chan-1 */
db_begin_transaction(db);
ofs = list_chain_fees(ctx, db);
ofs1 = account_onchain_fees(ctx, db, acct);
@@ -499,7 +501,7 @@ static bool test_onchain_fee_chan_close(const tal_t *ctx, struct plugin *p)
CHECK_MSG(!db_err, db_err);
CHECK(tal_count(ofs) == tal_count(ofs1));
CHECK(tal_count(ofs) == 2);
CHECK(tal_count(ofs) == 3);
/* txid 3333 */
db_begin_transaction(db);
@@ -527,11 +529,11 @@ static bool test_onchain_fee_chan_close(const tal_t *ctx, struct plugin *p)
CHECK_MSG(!db_err, db_err);
CHECK(tal_count(ofs) == tal_count(ofs1));
CHECK(tal_count(ofs) == 3);
CHECK(tal_count(ofs) == 4);
/* Expect: fees as follows
*
* chan-1, 1111, 200
* chan-1, 1111, 800,-600
* chan-1, 3333, 100
* chan-1, 2222, 50
*/
@@ -541,17 +543,31 @@ static bool test_onchain_fee_chan_close(const tal_t *ctx, struct plugin *p)
memset(&txid, '1', sizeof(struct bitcoin_txid));
if (bitcoin_txid_eq(&txid, &ofs[i]->txid)) {
CHECK(200 == ofs[i]->amount.millisatoshis); /* Raw: test eq */
CHECK(ofs[i]->update_count == 1
|| ofs[i]->update_count == 2);
if (ofs[i]->update_count == 1) {
CHECK(amount_msat_eq(ofs[i]->credit, AMOUNT_MSAT(800)));
CHECK(amount_msat_zero(ofs[i]->debit));
} else {
CHECK(amount_msat_eq(ofs[i]->debit, AMOUNT_MSAT(600)));
CHECK(amount_msat_zero(ofs[i]->credit));
}
continue;
}
memset(&txid, '2', sizeof(struct bitcoin_txid));
if (bitcoin_txid_eq(&txid, &ofs[i]->txid)) {
CHECK(50 == ofs[i]->amount.millisatoshis); /* Raw: test eq */
CHECK(ofs[i]->update_count == 1);
CHECK(amount_msat_eq(ofs[i]->credit, AMOUNT_MSAT(50)));
CHECK(amount_msat_zero(ofs[i]->debit));
continue;
}
memset(&txid, '3', sizeof(struct bitcoin_txid));
if (bitcoin_txid_eq(&txid, &ofs[i]->txid)) {
CHECK(100 == ofs[i]->amount.millisatoshis); /* Raw: test eq */
CHECK(ofs[i]->update_count == 1);
CHECK(amount_msat_eq(ofs[i]->credit, AMOUNT_MSAT(100)));
CHECK(amount_msat_zero(ofs[i]->debit));
continue;
}
@@ -591,37 +607,13 @@ static bool test_onchain_fee_chan_open(const tal_t *ctx, struct plugin *p)
/* Open two channels from wallet */
/* tag utxo_id vout txid debits credits acct_id
* withd XXXX 0 AAAA 1000 wallet
* withd YYYY 0 AAAA 3000 wallet
* withd YYYY 0 AAAA 3001 wallet
* open AAAA 0 500 chan-1
* open AAAA 1 1000 chan-2
* depo AAAA 2 2200 wallet
*/
memset(&txid, 'A', sizeof(struct bitcoin_txid));
db_begin_transaction(db);
log_chain_event(db, acct,
make_chain_event(ctx, "deposit",
AMOUNT_MSAT(500),
AMOUNT_MSAT(0),
'A', 0, '*'));
log_chain_event(db, acct2,
make_chain_event(ctx, "deposit",
AMOUNT_MSAT(1000),
AMOUNT_MSAT(0),
'A', 1, '*'));
log_chain_event(db, wal_acct,
make_chain_event(ctx, "deposit",
AMOUNT_MSAT(2200),
AMOUNT_MSAT(0),
'A', 2, '*'));
maybe_update_onchain_fees(ctx, db, &txid);
/* Should be no fee records yet! */
ofs = list_chain_fees(ctx, db);
CHECK_MSG(tal_count(ofs) == 0,
"no fees counted yet");
log_chain_event(db, wal_acct,
make_chain_event(ctx, "withdrawal",
AMOUNT_MSAT(0),
@@ -633,29 +625,67 @@ static bool test_onchain_fee_chan_open(const tal_t *ctx, struct plugin *p)
AMOUNT_MSAT(3001),
'Y', 0, 'A'));
log_chain_event(db, acct,
make_chain_event(ctx, "deposit",
AMOUNT_MSAT(500),
AMOUNT_MSAT(0),
'A', 0, '*'));
maybe_update_onchain_fees(ctx, db, &txid);
log_chain_event(db, acct2,
make_chain_event(ctx, "deposit",
AMOUNT_MSAT(1000),
AMOUNT_MSAT(0),
'A', 1, '*'));
maybe_update_onchain_fees(ctx, db, &txid);
log_chain_event(db, wal_acct,
make_chain_event(ctx, "deposit",
AMOUNT_MSAT(2200),
AMOUNT_MSAT(0),
'A', 2, '*'));
maybe_update_onchain_fees(ctx, db, &txid);
maybe_update_onchain_fees(ctx, db, &txid);
db_commit_transaction(db);
CHECK_MSG(!db_err, db_err);
/* Expect: 2 onchain fee records of 151/150msat ea,
/* Expect: 5 onchain fee records, totaling to 151/150msat ea,
* none for wallet */
db_begin_transaction(db);
ofs = list_chain_fees(ctx, db);
db_commit_transaction(db);
CHECK_MSG(!db_err, db_err);
CHECK(tal_count(ofs) == 2);
/* Since these are sorted by acct_id on fetch,
* this *should* be stable */
CHECK(ofs[0]->acct_db_id == acct->db_id);
CHECK(amount_msat_eq(ofs[0]->amount, AMOUNT_MSAT(151)));
CHECK(streq(ofs[0]->currency, "btc"));
CHECK(bitcoin_txid_eq(&ofs[0]->txid, &txid));
CHECK(tal_count(ofs) == 5);
CHECK(ofs[1]->acct_db_id == acct2->db_id);
CHECK(amount_msat_eq(ofs[1]->amount, AMOUNT_MSAT(150)));
CHECK(streq(ofs[1]->currency, "btc"));
CHECK(bitcoin_txid_eq(&ofs[1]->txid, &txid));
struct exp_result {
u32 credit;
u32 debit;
u32 update_count;
};
struct exp_result exp_results[] = {
{ .credit = 3501, .debit = 0, .update_count = 1 },
{ .credit = 0, .debit = 2250, .update_count = 2 },
{ .credit = 0, .debit = 1100, .update_count = 3 },
{ .credit = 1250, .debit = 0, .update_count = 1 },
{ .credit = 0, .debit = 1100, .update_count = 2 },
};
/* Since these are sorted on fetch,
* this *should* be stable */
for (size_t i = 0; i < tal_count(ofs); i++) {
CHECK(i < ARRAY_SIZE(exp_results));
CHECK(amount_msat_eq(ofs[i]->credit,
amount_msat(exp_results[i].credit)));
CHECK(amount_msat_eq(ofs[i]->debit,
amount_msat(exp_results[i].debit)));
CHECK(ofs[i]->update_count == exp_results[i].update_count);
CHECK(streq(ofs[i]->currency, "btc"));
CHECK(bitcoin_txid_eq(&ofs[i]->txid, &txid));
}
return true;
}