automation

This commit is contained in:
2023-08-18 17:02:53 +02:00
commit 33b7e1dd6d
119 changed files with 13464 additions and 0 deletions

17
postgresql/connect.go Normal file
View File

@@ -0,0 +1,17 @@
package postgresql
import (
"context"
"fmt"
"github.com/jackc/pgx/v4/pgxpool"
)
func PgConnect(databaseUrl string) (*pgxpool.Pool, error) {
var err error
pgxPool, err := pgxpool.Connect(context.Background(), databaseUrl)
if err != nil {
return nil, fmt.Errorf("pgxpool.Connect(%v): %w", databaseUrl, err)
}
return pgxPool, nil
}

View File

@@ -0,0 +1,68 @@
package postgresql
import (
"context"
"fmt"
"log"
"github.com/breez/lspd/lnd"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
type ForwardingEventStore struct {
pool *pgxpool.Pool
}
func NewForwardingEventStore(pool *pgxpool.Pool) *ForwardingEventStore {
return &ForwardingEventStore{pool: pool}
}
func (s *ForwardingEventStore) LastForwardingEvent() (int64, error) {
var last int64
err := s.pool.QueryRow(context.Background(),
`SELECT coalesce(MAX("timestamp"), 0) AS last FROM forwarding_history`).Scan(&last)
if err != nil {
return 0, err
}
return last, nil
}
func (s *ForwardingEventStore) InsertForwardingEvents(rowSrc lnd.CopyFromSource) error {
tx, err := s.pool.Begin(context.Background())
if err != nil {
return fmt.Errorf("pgxPool.Begin() error: %w", err)
}
defer tx.Rollback(context.Background())
_, err = tx.Exec(context.Background(), `
CREATE TEMP TABLE tmp_table ON COMMIT DROP AS
SELECT *
FROM forwarding_history
WITH NO DATA;
`)
if err != nil {
return fmt.Errorf("CREATE TEMP TABLE error: %w", err)
}
count, err := tx.CopyFrom(context.Background(),
pgx.Identifier{"tmp_table"},
[]string{"timestamp", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, rowSrc)
if err != nil {
return fmt.Errorf("CopyFrom() error: %w", err)
}
log.Printf("count1: %v", count)
cmdTag, err := tx.Exec(context.Background(), `
INSERT INTO forwarding_history
SELECT *
FROM tmp_table
ON CONFLICT DO NOTHING
`)
if err != nil {
return fmt.Errorf("INSERT INTO forwarding_history error: %w", err)
}
log.Printf("count2: %v", cmdTag.RowsAffected())
return tx.Commit(context.Background())
}

View File

@@ -0,0 +1,162 @@
package postgresql
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/breez/lspd/basetypes"
"github.com/breez/lspd/interceptor"
"github.com/btcsuite/btcd/wire"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
type extendedParams struct {
Token string `json:"token"`
Params interceptor.OpeningFeeParams `json:"fees_params"`
}
type PostgresInterceptStore struct {
pool *pgxpool.Pool
}
func NewPostgresInterceptStore(pool *pgxpool.Pool) *PostgresInterceptStore {
return &PostgresInterceptStore{pool: pool}
}
func (s *PostgresInterceptStore) PaymentInfo(htlcPaymentHash []byte) (string, *interceptor.OpeningFeeParams, []byte, []byte, []byte, int64, int64, *wire.OutPoint, *string, error) {
var (
p, tag *string
paymentHash, paymentSecret, destination []byte
incomingAmountMsat, outgoingAmountMsat int64
fundingTxID []byte
fundingTxOutnum pgtype.Int4
)
err := s.pool.QueryRow(context.Background(),
`SELECT payment_hash, payment_secret, destination, incoming_amount_msat, outgoing_amount_msat, funding_tx_id, funding_tx_outnum, opening_fee_params, tag
FROM payments
WHERE payment_hash=$1 OR sha256('probing-01:' || payment_hash)=$1`,
htlcPaymentHash).Scan(&paymentHash, &paymentSecret, &destination, &incomingAmountMsat, &outgoingAmountMsat, &fundingTxID, &fundingTxOutnum, &p, &tag)
if err != nil {
if err == pgx.ErrNoRows {
err = nil
}
return "", nil, nil, nil, nil, 0, 0, nil, nil, err
}
var cp *wire.OutPoint
if fundingTxID != nil {
cp, err = basetypes.NewOutPoint(fundingTxID, uint32(fundingTxOutnum.Int))
if err != nil {
log.Printf("invalid funding txid in database %x", fundingTxID)
}
}
var extParams *extendedParams
if p != nil {
err = json.Unmarshal([]byte(*p), &extParams)
if err != nil {
log.Printf("Failed to unmarshal OpeningFeeParams '%s': %v", *p, err)
return "", nil, nil, nil, nil, 0, 0, nil, nil, err
}
}
return extParams.Token, &extParams.Params, paymentHash, paymentSecret, destination, incomingAmountMsat, outgoingAmountMsat, cp, tag, nil
}
func (s *PostgresInterceptStore) SetFundingTx(paymentHash []byte, channelPoint *wire.OutPoint) error {
commandTag, err := s.pool.Exec(context.Background(),
`UPDATE payments
SET funding_tx_id = $2, funding_tx_outnum = $3
WHERE payment_hash=$1`,
paymentHash, channelPoint.Hash[:], channelPoint.Index)
log.Printf("setFundingTx(%x, %s, %d): %s err: %v", paymentHash, channelPoint.Hash.String(), channelPoint.Index, commandTag, err)
return err
}
func (s *PostgresInterceptStore) RegisterPayment(token string, params *interceptor.OpeningFeeParams, destination, paymentHash, paymentSecret []byte, incomingAmountMsat, outgoingAmountMsat int64, tag string) error {
var t *string
if tag != "" {
t = &tag
}
p := []byte{}
if params != nil {
var err error
p, err = json.Marshal(extendedParams{Token: token, Params: *params})
if err != nil {
log.Printf("Failed to marshal OpeningFeeParams: %v", err)
return err
}
}
commandTag, err := s.pool.Exec(context.Background(),
`INSERT INTO
payments (destination, payment_hash, payment_secret, incoming_amount_msat, outgoing_amount_msat, tag, opening_fee_params)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT DO NOTHING`,
destination, paymentHash, paymentSecret, incomingAmountMsat, outgoingAmountMsat, t, p)
log.Printf("registerPayment(%x, %x, %x, %v, %v, %v, %s) rows: %v err: %v",
destination, paymentHash, paymentSecret, incomingAmountMsat, outgoingAmountMsat, tag, p, commandTag.RowsAffected(), err)
if err != nil {
return fmt.Errorf("registerPayment(%x, %x, %x, %v, %v, %v, %s) error: %w",
destination, paymentHash, paymentSecret, incomingAmountMsat, outgoingAmountMsat, tag, p, err)
}
return nil
}
func (s *PostgresInterceptStore) InsertChannel(initialChanID, confirmedChanId uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error {
query := `INSERT INTO
channels (initial_chanid, confirmed_chanid, channel_point, nodeid, last_update)
VALUES ($1, NULLIF($2, 0::int8), $3, $4, $5)
ON CONFLICT (channel_point) DO UPDATE SET confirmed_chanid=NULLIF($2, 0::int8), last_update=$5`
c, err := s.pool.Exec(context.Background(),
query, int64(initialChanID), int64(confirmedChanId), channelPoint, nodeID, lastUpdate)
if err != nil {
log.Printf("insertChannel(%v, %v, %s, %x) error: %v",
initialChanID, confirmedChanId, channelPoint, nodeID, err)
return fmt.Errorf("insertChannel(%v, %v, %s, %x) error: %w",
initialChanID, confirmedChanId, channelPoint, nodeID, err)
}
log.Printf("insertChannel(%v, %v, %x) result: %v",
initialChanID, confirmedChanId, nodeID, c.String())
return nil
}
func (s *PostgresInterceptStore) GetFeeParamsSettings(token string) ([]*interceptor.OpeningFeeParamsSetting, error) {
rows, err := s.pool.Query(context.Background(), `SELECT validity, params FROM new_channel_params WHERE token=$1`, token)
if err != nil {
log.Printf("GetFeeParamsSettings(%v) error: %v", token, err)
return nil, err
}
var settings []*interceptor.OpeningFeeParamsSetting
for rows.Next() {
var validity int64
var param string
err = rows.Scan(&validity, &param)
if err != nil {
return nil, err
}
var params *interceptor.OpeningFeeParams
err := json.Unmarshal([]byte(param), &params)
if err != nil {
log.Printf("Failed to unmarshal fee param '%v': %v", param, err)
return nil, err
}
duration := time.Second * time.Duration(validity)
settings = append(settings, &interceptor.OpeningFeeParamsSetting{
Validity: duration,
Params: params,
})
}
return settings, nil
}

View File

View File

View File

@@ -0,0 +1 @@
DROP TABLE public.payments;

View File

@@ -0,0 +1,5 @@
CREATE TABLE public.payments (
payment_hash bytea NOT NULL,
payment_request_out varchar NOT NULL,
CONSTRAINT payments_pkey PRIMARY KEY (payment_hash)
);

View File

@@ -0,0 +1,5 @@
ALTER TABLE public.payments DROP COLUMN payment_secret;
ALTER TABLE public.payments DROP COLUMN destination;
ALTER TABLE public.payments DROP COLUMN incoming_amount_msat;
ALTER TABLE public.payments DROP COLUMN outgoing_amount_msat;
ALTER TABLE public.payments ADD payment_request_out varchar NOT NULL;

View File

@@ -0,0 +1,5 @@
ALTER TABLE public.payments DROP COLUMN payment_request_out;
ALTER TABLE public.payments ADD payment_secret bytea NOT NULL;
ALTER TABLE public.payments ADD destination bytea NOT NULL;
ALTER TABLE public.payments ADD incoming_amount_msat bigint NOT NULL;
ALTER TABLE public.payments ADD outgoing_amount_msat bigint NOT NULL;

View File

@@ -0,0 +1,2 @@
ALTER TABLE public.payments DROP COLUMN funding_tx_id;
ALTER TABLE public.payments DROP COLUMN funding_tx_outnum;

View File

@@ -0,0 +1,2 @@
ALTER TABLE public.payments ADD funding_tx_id bytea NULL;
ALTER TABLE public.payments ADD funding_tx_outnum int NULL;

View File

@@ -0,0 +1 @@
DROP INDEX probe_payment_hash;

View File

@@ -0,0 +1 @@
CREATE INDEX probe_payment_hash ON public.payments (sha256('probing-01:' || payment_hash));

View File

@@ -0,0 +1 @@
DROP TABLE public.forwarding_history;

View File

@@ -0,0 +1,10 @@
CREATE TABLE public.forwarding_history (
"timestamp" bigint NOT NULL,
chanid_in bigint NOT NULL,
chanid_out bigint NOT NULL,
amt_msat_in bigint NOT NULL,
amt_msat_out bigint NOT NULL,
CONSTRAINT timestamp_pkey PRIMARY KEY ("timestamp")
);
CREATE INDEX forwarding_history_chanid_in_idx ON public.forwarding_history (chanid_in);
CREATE INDEX forwarding_history_chanid_out_idx ON public.forwarding_history (chanid_out);

View File

@@ -0,0 +1 @@
DROP TABLE public.channels;

View File

@@ -0,0 +1,7 @@
CREATE TABLE public.channels (
chanid bigint NOT NULL,
channel_point varchar NULL,
nodeid bytea NULL,
CONSTRAINT chanid_pkey PRIMARY KEY (chanid)
);
CREATE INDEX channels_nodeid_idx ON public.channels (nodeid);

View File

@@ -0,0 +1 @@
ALTER TABLE public.channels DROP COLUMN last_update;

View File

@@ -0,0 +1 @@
ALTER TABLE public.channels ADD COLUMN last_update TIMESTAMP;

View File

@@ -0,0 +1,21 @@
ALTER INDEX public.channels_nodeid_idx RENAME TO channels_new_nodeid_idx;
ALTER INDEX public.channels_channel_point_pkey RENAME TO channels_new_channel_point_pkey
ALTER TABLE public.channels RENAME TO channels_new;
CREATE TABLE public.channels (
chanid int8 NOT NULL,
channel_point varchar NULL,
nodeid bytea NULL,
last_update timestamp NULL,
CONSTRAINT chanid_pkey PRIMARY KEY (chanid)
);
CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid);
INSERT INTO public.channels
SELECT initial_chanid chanid, channel_point, nodeid, last_update FROM channels_new;
INSERT INTO public.channels
SELECT confirmed_chanid chanid, channel_point, nodeid, last_update FROM channels_new
WHERE confirmed_chanid IS NOT NULL AND confirmed_chanid <> initial_chanid;
DROP TABLE channels_new;

View File

@@ -0,0 +1,22 @@
ALTER INDEX public.channels_nodeid_idx RENAME TO channels_old_nodeid_idx;
ALTER INDEX public.chanid_pkey RENAME TO channels_old_chanid_pkey;
ALTER TABLE public.channels RENAME TO channels_old;
CREATE TABLE public.channels (
initial_chanid int8 NOT NULL,
confirmed_chanid int8 NULL,
channel_point varchar NOT NULL,
nodeid bytea NOT NULL,
last_update timestamp NULL,
CONSTRAINT channels_channel_point_pkey PRIMARY KEY (channel_point)
);
CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid);
INSERT INTO public.channels
SELECT
min(chanid) initial_chanid,
CASE WHEN (max(chanid) >> 40) < (3 << 17) THEN NULL ELSE max(chanid) END confirmed_chanid,
channel_point, nodeid, max(last_update) last_update
FROM channels_old GROUP BY channel_point, nodeid;
DROP TABLE public.channels_old;

View File

@@ -0,0 +1 @@
ALTER TABLE public.payments DROP COLUMN tag;

View File

@@ -0,0 +1 @@
ALTER TABLE public.payments ADD tag jsonb NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE public.payments DROP COLUMN opening_fee_params;

View File

@@ -0,0 +1 @@
ALTER TABLE public.payments ADD opening_fee_params jsonb NULL;

View File

@@ -0,0 +1 @@
DROP TABLE public.new_channel_params;

View File

@@ -0,0 +1,11 @@
CREATE TABLE public.new_channel_params (
validity int NOT NULL,
params jsonb NOT NULL
);
CREATE UNIQUE INDEX new_channel_params_validity_idx ON public.new_channel_params (validity);
INSERT INTO public.new_channel_params (validity, params)
VALUES(259200, '{"min_msat": "12000000", "proportional": 7500, "max_idle_time": 4320, "max_client_to_self_delay": 432}'::jsonb);
INSERT INTO public.new_channel_params (validity, params)
VALUES(3600, '{"min_msat": "10000000", "proportional": 7500, "max_idle_time": 4320, "max_client_to_self_delay": 432}'::jsonb);

View File

@@ -0,0 +1,3 @@
ALTER TABLE public.new_channel_params DROP COLUMN token;
DROP INDEX new_channel_params_token_validity_idx;
CREATE UNIQUE INDEX new_channel_params_validity_idx ON public.new_channel_params (validity);

View File

@@ -0,0 +1,3 @@
ALTER TABLE public.new_channel_params ADD token varchar;
DROP INDEX public.new_channel_params_validity_idx;
CREATE UNIQUE INDEX new_channel_params_token_validity_idx ON public.new_channel_params (token, validity);

View File

@@ -0,0 +1,3 @@
DROP INDEX notification_subscriptions_pubkey_url_key;
DROP INDEX notification_subscriptions_pubkey_idx;
DROP TABLE public.notification_subscriptions;

View File

@@ -0,0 +1,10 @@
CREATE TABLE public.notification_subscriptions (
id bigserial primary key,
pubkey bytea NOT NULL,
url varchar NOT NULL,
created_at bigint NOT NULL,
refreshed_at bigint NOT NULL
);
CREATE INDEX notification_subscriptions_pubkey_idx ON public.notification_subscriptions (pubkey);
CREATE UNIQUE INDEX notification_subscriptions_pubkey_url_key ON public.notification_subscriptions (pubkey, url);

View File

@@ -0,0 +1,76 @@
package postgresql
import (
"context"
"encoding/hex"
"time"
"github.com/jackc/pgx/v4/pgxpool"
)
type NotificationsStore struct {
pool *pgxpool.Pool
}
func NewNotificationsStore(pool *pgxpool.Pool) *NotificationsStore {
return &NotificationsStore{pool: pool}
}
func (s *NotificationsStore) Register(
ctx context.Context,
pubkey string,
url string,
) error {
pk, err := hex.DecodeString(pubkey)
if err != nil {
return err
}
now := time.Now().UnixMicro()
_, err = s.pool.Exec(
ctx,
`INSERT INTO public.notification_subscriptions (pubkey, url, created_at, refreshed_at)
values ($1, $2, $3, $4)
ON CONFLICT (pubkey, url) DO UPDATE SET refreshed_at = $4`,
pk,
url,
now,
now,
)
return err
}
func (s *NotificationsStore) GetRegistrations(
ctx context.Context,
pubkey string,
) ([]string, error) {
pk, err := hex.DecodeString(pubkey)
if err != nil {
return nil, err
}
rows, err := s.pool.Query(
ctx,
`SELECT url
FROM public.notification_subscriptions
WHERE pubkey = $1`,
pk,
)
if err != nil {
return nil, err
}
var result []string
for rows.Next() {
var url string
err = rows.Scan(&url)
if err != nil {
return nil, err
}
result = append(result, url)
}
return result, nil
}