lsps2: cleanup expired promises

This commit is contained in:
Jesse de Wit
2023-09-11 13:11:10 +02:00
parent ba1e4074eb
commit fb3b051d02
6 changed files with 110 additions and 4 deletions

43
lsps2/cleanup_service.go Normal file
View File

@@ -0,0 +1,43 @@
package lsps2
import (
"context"
"log"
"time"
)
type CleanupService struct {
store Lsps2Store
}
// The interval to clean unused promises and buy registrations.
var CleanupInterval time.Duration = time.Hour
// The relax period is a period where expired promises may still be valid, if
// the current chainfees are cheaper than the fees in the promise itself. It is
// set to ~two weeks.
var RelaxPeriod time.Duration = time.Hour * 24 * 14
func NewCleanupService(store Lsps2Store) *CleanupService {
return &CleanupService{
store: store,
}
}
// Periodically cleans up unused buy registrations and promises that have
// expired before the relax interval.
func (c *CleanupService) Start(ctx context.Context) {
for {
before := time.Now().UTC().Add(-RelaxPeriod)
err := c.store.RemoveUnusedExpired(ctx, before)
if err != nil {
log.Printf("Failed to remove unused expired registrations before %v: %v", before, err)
}
select {
case <-time.After(CleanupInterval):
continue
case <-ctx.Done():
return
}
}
}

View File

@@ -91,6 +91,10 @@ func (s *mockLsps2Store) SavePromises(ctx context.Context, req *SavePromises) er
return nil return nil
} }
func (s *mockLsps2Store) RemoveUnusedExpired(ctx context.Context, before time.Time) error {
return nil
}
type mockLightningClient struct { type mockLightningClient struct {
openResponses []*wire.OutPoint openResponses []*wire.OutPoint
openRequests []*lightning.OpenChannelRequest openRequests []*lightning.OpenChannelRequest

View File

@@ -69,4 +69,5 @@ type Lsps2Store interface {
GetBuyRegistration(ctx context.Context, scid lightning.ShortChannelID) (*BuyRegistration, error) GetBuyRegistration(ctx context.Context, scid lightning.ShortChannelID) (*BuyRegistration, error)
SetChannelOpened(ctx context.Context, channelOpened *ChannelOpened) error SetChannelOpened(ctx context.Context, channelOpened *ChannelOpened) error
SetCompleted(ctx context.Context, registrationId uint64) error SetCompleted(ctx context.Context, registrationId uint64) error
RemoveUnusedExpired(ctx context.Context, before time.Time) error
} }

View File

@@ -102,6 +102,8 @@ func main() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
openingService := shared.NewOpeningService(openingStore, nodesService) openingService := shared.NewOpeningService(openingStore, nodesService)
cleanupService := lsps2.NewCleanupService(lsps2Store)
go cleanupService.Start(ctx)
var interceptors []interceptor.HtlcInterceptor var interceptors []interceptor.HtlcInterceptor
for _, node := range nodes { for _, node := range nodes {
var htlcInterceptor interceptor.HtlcInterceptor var htlcInterceptor interceptor.HtlcInterceptor

View File

@@ -3,9 +3,12 @@ package postgresql
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"strings" "strings"
"time"
"github.com/breez/lspd/lightning" "github.com/breez/lspd/lightning"
"github.com/breez/lspd/lsps0"
"github.com/breez/lspd/lsps2" "github.com/breez/lspd/lsps2"
"github.com/breez/lspd/shared" "github.com/breez/lspd/shared"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
@@ -233,13 +236,64 @@ func (s *Lsps2Store) SavePromises(
rows := [][]interface{}{} rows := [][]interface{}{}
for _, p := range req.Menu { for _, p := range req.Menu {
rows = append(rows, []interface{}{p.Promise, req.Token}) rows = append(rows, []interface{}{p.Promise, req.Token, p.ValidUntil})
} }
_, err := s.pool.CopyFrom( _, err := s.pool.CopyFrom(
ctx, ctx,
pgx.Identifier{"lsps2", "promises"}, pgx.Identifier{"lsps2", "promises"},
[]string{"promise", "token"}, []string{"promise", "token", "valid_until"},
pgx.CopyFromRows(rows), pgx.CopyFromRows(rows),
) )
return err return err
} }
func (s *Lsps2Store) RemoveUnusedExpired(
ctx context.Context,
before time.Time,
) error {
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
// Rollback will not do anything if Commit() has already been called.
defer tx.Rollback(ctx)
timestamp := before.Format(lsps0.TIME_FORMAT)
// Promises can be deleted without issue.
tag, err := tx.Exec(
ctx,
`DELETE FROM lsps2.buy_registrations r
WHERE r.id IN (
SELECT sr.id
FROM lsps2.buy_registrations sr
LEFT JOIN lsps2.bought_channels sb ON sr.id = sb.registration_id
WHERE sb.registration_id IS NULL
AND sr.params_valid_until < $1
)`,
timestamp,
)
if err != nil {
return err
}
rowsAffected := tag.RowsAffected()
if rowsAffected > 0 {
log.Printf("Deleted %d expired buy registrations before %s", rowsAffected, timestamp)
}
tag, err = tx.Exec(
ctx,
`DELETE FROM lsps2.promises
WHERE valid_until < $1`,
timestamp,
)
if err != nil {
return err
}
rowsAffected = tag.RowsAffected()
if rowsAffected > 0 {
log.Printf("Deleted %d expired promises before %s", rowsAffected, timestamp)
}
return tx.Commit(ctx)
}

View File

@@ -34,5 +34,7 @@ CREATE INDEX idx_lsps2_bought_channels_registration_id ON lsps2.bought_channels
CREATE TABLE lsps2.promises ( CREATE TABLE lsps2.promises (
promise varchar PRIMARY KEY, promise varchar PRIMARY KEY,
token varchar NOT NULL token varchar NOT NULL,
) valid_until varchar NOT NULL
);
CREATE INDEX idx_lsps2_promises_valid_until ON lsps2.promises (valid_until);