#include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define CHAIN_MOVE "chain_mvt" #define CHANNEL_MOVE "channel_mvt" /* The database that we store all the accounting data in */ static struct db *db ; // FIXME: make relative to directory we're loaded into static char *db_dsn = "sqlite3://accounts.sqlite3"; static struct command_result *json_list_balances(struct command *cmd, const char *buf, const jsmntok_t *params) { struct json_stream *res; struct account **accts; char *err; if (!param(cmd, buf, params, NULL)) return command_param_failed(); res = jsonrpc_stream_success(cmd); /* List of accts */ db_begin_transaction(db); accts = list_accounts(cmd, db); json_array_start(res, "accounts"); for (size_t i = 0; i < tal_count(accts); i++) { struct acct_balance **balances; err = account_get_balance(cmd, db, accts[i]->name, &balances); if (err) plugin_err(cmd->plugin, "Get account balance returned err" " for account %s: %s", accts[i]->name, err); /* Add it to the result data */ json_object_start(res, NULL); json_add_string(res, "account_id", accts[i]->name); json_array_start(res, "balances"); for (size_t j = 0; j < tal_count(balances); j++) { json_object_start(res, NULL); json_add_amount_msat_only(res, "balance", balances[j]->balance); json_add_string(res, "coin_type", balances[j]->currency); json_object_end(res); } json_array_end(res); json_object_end(res); } json_array_end(res); db_commit_transaction(db); return command_finished(cmd, res); } struct account_snap { char *name; struct amount_msat amt; char *coin_type; }; static struct command_result *json_balance_snapshot(struct command *cmd, const char *buf, const jsmntok_t *params) { const char *err; size_t i; u32 blockheight; u64 timestamp; struct account_snap *snaps; const jsmntok_t *accounts_tok, *acct_tok, *snap_tok = json_get_member(buf, params, "balance_snapshot"); if (snap_tok == NULL || snap_tok->type != JSMN_OBJECT) plugin_err(cmd->plugin, "`balance_snapshot` payload did not scan %s: %.*s", "no 'balance_snapshot'", json_tok_full_len(params), json_tok_full(buf, params)); err = json_scan(cmd, buf, snap_tok, "{blockheight:%" ",timestamp:%}", JSON_SCAN(json_to_number, &blockheight), JSON_SCAN(json_to_u64, ×tamp)); if (err) plugin_err(cmd->plugin, "`balance_snapshot` payload did not scan %s: %.*s", err, json_tok_full_len(params), json_tok_full(buf, params)); accounts_tok = json_get_member(buf, snap_tok, "accounts"); if (accounts_tok == NULL || accounts_tok->type != JSMN_ARRAY) plugin_err(cmd->plugin, "`balance_snapshot` payload did not scan %s: %.*s", "no 'balance_snapshot.accounts'", json_tok_full_len(params), json_tok_full(buf, params)); snaps = tal_arr(cmd, struct account_snap, accounts_tok->size); db_begin_transaction(db); json_for_each_arr(i, acct_tok, accounts_tok) { struct acct_balance **balances; struct amount_msat balance; struct account_snap s = snaps[i]; err = json_scan(cmd, buf, acct_tok, "{account_id:%" ",balance_msat:%" ",coin_type:%}", JSON_SCAN_TAL(tmpctx, json_strdup, &s.name), JSON_SCAN(json_to_msat, &s.amt), JSON_SCAN_TAL(tmpctx, json_strdup, &s.coin_type)); if (err) plugin_err(cmd->plugin, "`balance_snapshot` payload did not" " scan %s: %.*s", err, json_tok_full_len(params), json_tok_full(buf, params)); plugin_log(cmd->plugin, LOG_DBG, "account %s has balance %s", s.name, type_to_string(tmpctx, struct amount_msat, &s.amt)); /* Find the account and verify the balance */ err = account_get_balance(cmd, db, s.name, &balances); if (err) plugin_err(cmd->plugin, "Get account balance returned err" " for account %s: %s", s.name, err); /* FIXME: multiple currency balances */ balance = AMOUNT_MSAT(0); for (size_t j = 0; j < tal_count(balances); j++) { bool ok; ok = amount_msat_add(&balance, balance, balances[j]->balance); assert(ok); } if (!amount_msat_eq(s.amt, balance)) { struct account *acct; struct channel_event ev; plugin_log(cmd->plugin, LOG_UNUSUAL, "Snapshot balance does not equal ondisk" " reported %s, on disk %s (account %s)." " Logging journal entry.", type_to_string(tmpctx, struct amount_msat, &s.amt), type_to_string(tmpctx, struct amount_msat, &balance), s.name); if (!amount_msat_sub(&ev.credit, s.amt, balance)) { ev.credit = AMOUNT_MSAT(0); if (!amount_msat_sub(&ev.debit, balance, s.amt)) plugin_err(cmd->plugin, "Unable to sub amt"); } else ev.debit = AMOUNT_MSAT(0); /* Log a channel "journal entry" to get * the balances inline */ acct = find_account(cmd, db, s.name); if (!acct) { plugin_log(cmd->plugin, LOG_INFORM, "account %s not found, adding" " along with new balance", s.name); /* FIXME: lookup peer id for channel? */ acct = new_account(cmd, s.name, NULL); account_add(db, acct); } /* This is *not* a coin_mvt tag type */ ev.tag = "journal_entry"; ev.fees = AMOUNT_MSAT(0); ev.currency = s.coin_type; ev.part_id = 0; ev.payment_id = NULL; /* Use current time for this */ ev.timestamp = time_now().ts.tv_sec; log_channel_event(db, acct, &ev); } } db_commit_transaction(db); return notification_handled(cmd); } static const char *parse_and_log_chain_move(struct command *cmd, const char *buf, const jsmntok_t *params, const char *acct_name STEALS, const struct amount_msat credit, const struct amount_msat debit, const char *coin_type STEALS, const u64 timestamp, const enum mvt_tag *tags) { struct chain_event *e = tal(cmd, struct chain_event); struct sha256 *payment_hash = tal(cmd, struct sha256); struct bitcoin_txid *spending_txid = tal(cmd, struct bitcoin_txid); struct account *acct; const char *err; /* Fields we expect on *every* chain movement */ err = json_scan(tmpctx, buf, params, "{coin_movement:" "{utxo_txid:%" ",vout:%" ",output_msat:%" ",blockheight:%" "}}", JSON_SCAN(json_to_txid, &e->outpoint.txid), JSON_SCAN(json_to_number, &e->outpoint.n), JSON_SCAN(json_to_msat, &e->output_value), JSON_SCAN(json_to_number, &e->blockheight)); if (err) return err; /* Now try to get out the optional parts */ err = json_scan(tmpctx, buf, params, "{coin_movement:" "{txid:%" "}}", JSON_SCAN(json_to_txid, spending_txid)); if (err) spending_txid = tal_free(spending_txid); e->spending_txid = tal_steal(e, spending_txid); /* Now try to get out the optional parts */ err = json_scan(tmpctx, buf, params, "{coin_movement:" "{payment_hash:%" "}}", JSON_SCAN(json_to_sha256, payment_hash)); if (err) payment_hash = tal_free(payment_hash); e->payment_id = tal_steal(e, payment_hash); e->credit = credit; e->debit = debit; e->currency = tal_steal(e, coin_type); e->timestamp = timestamp; e->tag = mvt_tag_str(tags[0]); db_begin_transaction(db); acct = find_account(cmd, db, acct_name); if (!acct) { /* FIXME: lookup the peer id for this channel! */ acct = new_account(cmd, acct_name, NULL); account_add(db, acct); } log_chain_event(db, acct, e); /* This event *might* have implications for account; * update as necessary */ maybe_update_account(db, acct, e, tags); /* Can we calculate any onchain fees now? */ err = maybe_update_onchain_fees(cmd, db, e->spending_txid ? e->spending_txid : &e->outpoint.txid); db_commit_transaction(db); if (err) return err; /* FIXME: maybe mark channel as 'onchain_resolved' */ return NULL; } static const char *parse_and_log_channel_move(struct command *cmd, const char *buf, const jsmntok_t *params, const char *acct_name STEALS, const struct amount_msat credit, const struct amount_msat debit, const char *coin_type STEALS, const u64 timestamp, const enum mvt_tag *tags) { struct channel_event *e = tal(cmd, struct channel_event); struct account *acct; const char *err; e->payment_id = tal(e, struct sha256); err = json_scan(tmpctx, buf, params, "{coin_movement:{payment_hash:%}}", JSON_SCAN(json_to_sha256, e->payment_id)); if (err) e->payment_id = tal_free(e->payment_id); err = json_scan(tmpctx, buf, params, "{coin_movement:{part_id:%}}", JSON_SCAN(json_to_number, &e->part_id)); if (err) e->part_id = 0; err = json_scan(tmpctx, buf, params, "{coin_movement:{fees_msat:%}}", JSON_SCAN(json_to_msat, &e->fees)); if (err) e->fees = AMOUNT_MSAT(0); e->credit = credit; e->debit = debit; e->currency = tal_steal(e, coin_type); e->timestamp = timestamp; e->tag = mvt_tag_str(tags[0]); /* Go find the account for this event */ db_begin_transaction(db); acct = find_account(cmd, db, acct_name); if (!acct) plugin_err(cmd->plugin, "Received channel event," " but no account exists %s", acct_name); log_channel_event(db, acct, e); db_commit_transaction(db); return NULL; } static char *parse_tags(const tal_t *ctx, const char *buf, const jsmntok_t *tok, enum mvt_tag **tags) { size_t i; const jsmntok_t *tag_tok, *tags_tok = json_get_member(buf, tok, "tags"); if (tags_tok == NULL || tags_tok->type != JSMN_ARRAY) return "Invalid/missing 'tags' field"; *tags = tal_arr(ctx, enum mvt_tag, tags_tok->size); json_for_each_arr(i, tag_tok, tags_tok) { if (!json_to_coin_mvt_tag(buf, tag_tok, &(*tags)[i])) return "Unable to parse 'tags'"; } return NULL; } static struct command_result * json_coin_moved(struct command *cmd, const char *buf, const jsmntok_t *params) { const char *err, *mvt_type, *acct_name, *coin_type; u32 version; u64 timestamp; struct amount_msat credit, debit; enum mvt_tag *tags; err = json_scan(tmpctx, buf, params, "{coin_movement:" "{version:%" ",type:%" ",account_id:%" ",credit_msat:%" ",debit_msat:%" ",coin_type:%" ",timestamp:%" "}}", JSON_SCAN(json_to_number, &version), JSON_SCAN_TAL(tmpctx, json_strdup, &mvt_type), JSON_SCAN_TAL(tmpctx, json_strdup, &acct_name), JSON_SCAN(json_to_msat, &credit), JSON_SCAN(json_to_msat, &debit), JSON_SCAN_TAL(tmpctx, json_strdup, &coin_type), JSON_SCAN(json_to_u64, ×tamp)); if (err) plugin_err(cmd->plugin, "`coin_movement` payload did not scan %s: %.*s", err, json_tok_full_len(params), json_tok_full(buf, params)); err = parse_tags(cmd, buf, json_get_member(buf, params, "coin_movement"), &tags); if (err) plugin_err(cmd->plugin, "`coin_movement` payload did not scan %s: %.*s", err, json_tok_full_len(params), json_tok_full(buf, params)); /* We expect version 2 of coin movements */ assert(version == 2); plugin_log(cmd->plugin, LOG_DBG, "coin_move %d %s -%s %s %"PRIu64, version, type_to_string(tmpctx, struct amount_msat, &credit), type_to_string(tmpctx, struct amount_msat, &debit), mvt_type, timestamp); if (streq(mvt_type, CHAIN_MOVE)) err = parse_and_log_chain_move(cmd, buf, params, acct_name, credit, debit, coin_type, timestamp, tags); else { assert(streq(mvt_type, CHANNEL_MOVE)); err = parse_and_log_channel_move(cmd, buf, params, acct_name, credit, debit, coin_type, timestamp, tags); } if (err) plugin_err(cmd->plugin, "`coin_movement` payload did not scan %s: %.*s", err, json_tok_full_len(params), json_tok_full(buf, params)); return notification_handled(cmd); } const struct plugin_notification notifs[] = { { "coin_movement", json_coin_moved, }, { "balance_snapshot", json_balance_snapshot, } }; static const struct plugin_command commands[] = { { "listbalances", "bookkeeping", "List current account balances", "List of current accounts and their balances", json_list_balances }, }; static const char *init(struct plugin *p, const char *b, const jsmntok_t *t) { // FIXME: pass in database DSN as an option?? db = notleak(db_setup(p, p, db_dsn)); return NULL; } int main(int argc, char *argv[]) { setup_locale(); plugin_main(argv, init, PLUGIN_STATIC, true, NULL, commands, ARRAY_SIZE(commands), notifs, ARRAY_SIZE(notifs), NULL, 0, NULL, 0, NULL); return 0; }