mirror of
https://github.com/aljazceru/ark.git
synced 2025-12-18 04:34:19 +01:00
Add integration tests for sweeping rounds (#339)
* add "block" scheduler type + sweep integration test * increase timeout in integrationtests * remove config logs * rename scheduler package name * rename package * rename packages
This commit is contained in:
@@ -50,16 +50,18 @@ type AdminService interface {
|
||||
}
|
||||
|
||||
type adminService struct {
|
||||
walletSvc ports.WalletService
|
||||
repoManager ports.RepoManager
|
||||
txBuilder ports.TxBuilder
|
||||
walletSvc ports.WalletService
|
||||
repoManager ports.RepoManager
|
||||
txBuilder ports.TxBuilder
|
||||
sweeperTimeUnit ports.TimeUnit
|
||||
}
|
||||
|
||||
func NewAdminService(walletSvc ports.WalletService, repoManager ports.RepoManager, txBuilder ports.TxBuilder) AdminService {
|
||||
func NewAdminService(walletSvc ports.WalletService, repoManager ports.RepoManager, txBuilder ports.TxBuilder, timeUnit ports.TimeUnit) AdminService {
|
||||
return &adminService{
|
||||
walletSvc: walletSvc,
|
||||
repoManager: repoManager,
|
||||
txBuilder: txBuilder,
|
||||
walletSvc: walletSvc,
|
||||
repoManager: repoManager,
|
||||
txBuilder: txBuilder,
|
||||
sweeperTimeUnit: timeUnit,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +132,7 @@ func (a *adminService) GetScheduledSweeps(ctx context.Context) ([]ScheduledSweep
|
||||
|
||||
for _, round := range sweepableRounds {
|
||||
sweepable, err := findSweepableOutputs(
|
||||
ctx, a.walletSvc, a.txBuilder, round.CongestionTree,
|
||||
ctx, a.walletSvc, a.txBuilder, a.sweeperTimeUnit, round.CongestionTree,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -177,7 +177,7 @@ func (s *covenantService) SpendVtxos(ctx context.Context, inputs []ports.Input)
|
||||
return "", fmt.Errorf("failed to parse tx %s: %s", input.Txid, err)
|
||||
}
|
||||
|
||||
confirmed, blocktime, err := s.wallet.IsTransactionConfirmed(ctx, input.Txid)
|
||||
confirmed, _, blocktime, err := s.wallet.IsTransactionConfirmed(ctx, input.Txid)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to check tx %s: %s", input.Txid, err)
|
||||
}
|
||||
@@ -910,12 +910,10 @@ func (s *covenantService) scheduleSweepVtxosForRound(round *domain.Round) {
|
||||
return
|
||||
}
|
||||
|
||||
expirationTimestamp := time.Now().Add(
|
||||
time.Duration(s.roundLifetime+30) * time.Second,
|
||||
)
|
||||
expirationTime := s.sweeper.scheduler.AddNow(s.roundLifetime)
|
||||
|
||||
if err := s.sweeper.schedule(
|
||||
expirationTimestamp.Unix(), round.Txid, round.CongestionTree,
|
||||
expirationTime, round.Txid, round.CongestionTree,
|
||||
); err != nil {
|
||||
log.WithError(err).Warn("failed to schedule sweep tx")
|
||||
}
|
||||
|
||||
@@ -421,7 +421,7 @@ func (s *covenantlessService) SpendVtxos(ctx context.Context, inputs []ports.Inp
|
||||
return "", fmt.Errorf("failed to deserialize tx %s: %s", input.Txid, err)
|
||||
}
|
||||
|
||||
confirmed, blocktime, err := s.wallet.IsTransactionConfirmed(ctx, input.Txid)
|
||||
confirmed, _, blocktime, err := s.wallet.IsTransactionConfirmed(ctx, input.Txid)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to check tx %s: %s", input.Txid, err)
|
||||
}
|
||||
@@ -1316,13 +1316,9 @@ func (s *covenantlessService) scheduleSweepVtxosForRound(round *domain.Round) {
|
||||
return
|
||||
}
|
||||
|
||||
expirationTimestamp := time.Now().Add(
|
||||
time.Duration(s.roundLifetime+30) * time.Second,
|
||||
)
|
||||
expirationTimestamp := s.sweeper.scheduler.AddNow(s.roundLifetime)
|
||||
|
||||
if err := s.sweeper.schedule(
|
||||
expirationTimestamp.Unix(), round.Txid, round.CongestionTree,
|
||||
); err != nil {
|
||||
if err := s.sweeper.schedule(expirationTimestamp, round.Txid, round.CongestionTree); err != nil {
|
||||
log.WithError(err).Warn("failed to schedule sweep tx")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package application
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ark-network/ark/common/tree"
|
||||
@@ -22,6 +23,7 @@ type sweeper struct {
|
||||
scheduler ports.SchedulerService
|
||||
|
||||
// cache of scheduled tasks, avoid scheduling the same sweep event multiple times
|
||||
locker sync.Locker
|
||||
scheduledTasks map[string]struct{}
|
||||
}
|
||||
|
||||
@@ -36,6 +38,7 @@ func newSweeper(
|
||||
repoManager,
|
||||
builder,
|
||||
scheduler,
|
||||
&sync.Mutex{},
|
||||
make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
@@ -62,6 +65,8 @@ func (s *sweeper) stop() {
|
||||
|
||||
// removeTask update the cached map of scheduled tasks
|
||||
func (s *sweeper) removeTask(treeRootTxid string) {
|
||||
s.locker.Lock()
|
||||
defer s.locker.Unlock()
|
||||
delete(s.scheduledTasks, treeRootTxid)
|
||||
}
|
||||
|
||||
@@ -84,13 +89,22 @@ func (s *sweeper) schedule(
|
||||
}
|
||||
|
||||
task := s.createTask(roundTxid, congestionTree)
|
||||
fancyTime := time.Unix(expirationTimestamp, 0).Format("2006-01-02 15:04:05")
|
||||
|
||||
var fancyTime string
|
||||
if s.scheduler.Unit() == ports.UnixTime {
|
||||
fancyTime = time.Unix(expirationTimestamp, 0).Format("2006-01-02 15:04:05")
|
||||
} else {
|
||||
fancyTime = fmt.Sprintf("block %d", expirationTimestamp)
|
||||
}
|
||||
log.Debugf("scheduled sweep for round %s at %s", roundTxid, fancyTime)
|
||||
|
||||
if err := s.scheduler.ScheduleTaskOnce(expirationTimestamp, task); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.locker.Lock()
|
||||
s.scheduledTasks[root.Txid] = struct{}{}
|
||||
s.locker.Unlock()
|
||||
|
||||
if err := s.updateVtxoExpirationTime(congestionTree, expirationTimestamp); err != nil {
|
||||
log.WithError(err).Error("error while updating vtxo expiration time")
|
||||
@@ -120,7 +134,7 @@ func (s *sweeper) createTask(
|
||||
vtxoKeys := make([]domain.VtxoKey, 0) // vtxos associated to the sweep inputs
|
||||
|
||||
// inspect the congestion tree to find onchain shared outputs
|
||||
sharedOutputs, err := findSweepableOutputs(ctx, s.wallet, s.builder, congestionTree)
|
||||
sharedOutputs, err := findSweepableOutputs(ctx, s.wallet, s.builder, s.scheduler.Unit(), congestionTree)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while inspecting congestion tree")
|
||||
return
|
||||
@@ -128,7 +142,7 @@ func (s *sweeper) createTask(
|
||||
|
||||
for expiredAt, inputs := range sharedOutputs {
|
||||
// if the shared outputs are not expired, schedule a sweep task for it
|
||||
if time.Unix(expiredAt, 0).After(time.Now()) {
|
||||
if s.scheduler.AfterNow(expiredAt) {
|
||||
subtrees, err := computeSubTrees(congestionTree, inputs)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while computing subtrees")
|
||||
@@ -136,8 +150,7 @@ func (s *sweeper) createTask(
|
||||
}
|
||||
|
||||
for _, subTree := range subtrees {
|
||||
// mitigate the risk to get BIP68 non-final errors by scheduling the task 30 seconds after the expiration time
|
||||
if err := s.schedule(int64(expiredAt), roundTxid, subTree); err != nil {
|
||||
if err := s.schedule(expiredAt, roundTxid, subTree); err != nil {
|
||||
log.WithError(err).Error("error while scheduling sweep task")
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -259,17 +259,18 @@ func findSweepableOutputs(
|
||||
ctx context.Context,
|
||||
walletSvc ports.WalletService,
|
||||
txbuilder ports.TxBuilder,
|
||||
schedulerUnit ports.TimeUnit,
|
||||
congestionTree tree.CongestionTree,
|
||||
) (map[int64][]ports.SweepInput, error) {
|
||||
sweepableOutputs := make(map[int64][]ports.SweepInput)
|
||||
blocktimeCache := make(map[string]int64) // txid -> blocktime
|
||||
blocktimeCache := make(map[string]int64) // txid -> blocktime / blockheight
|
||||
nodesToCheck := congestionTree[0] // init with the root
|
||||
|
||||
for len(nodesToCheck) > 0 {
|
||||
newNodesToCheck := make([]tree.Node, 0)
|
||||
|
||||
for _, node := range nodesToCheck {
|
||||
isConfirmed, blocktime, err := walletSvc.IsTransactionConfirmed(ctx, node.Txid)
|
||||
isConfirmed, height, blocktime, err := walletSvc.IsTransactionConfirmed(ctx, node.Txid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -279,21 +280,31 @@ func findSweepableOutputs(
|
||||
|
||||
if !isConfirmed {
|
||||
if _, ok := blocktimeCache[node.ParentTxid]; !ok {
|
||||
isConfirmed, blocktime, err := walletSvc.IsTransactionConfirmed(ctx, node.ParentTxid)
|
||||
isConfirmed, height, blocktime, err := walletSvc.IsTransactionConfirmed(ctx, node.ParentTxid)
|
||||
if !isConfirmed || err != nil {
|
||||
return nil, fmt.Errorf("tx %s not found", node.ParentTxid)
|
||||
}
|
||||
|
||||
blocktimeCache[node.ParentTxid] = blocktime
|
||||
if schedulerUnit == ports.BlockHeight {
|
||||
blocktimeCache[node.ParentTxid] = height
|
||||
} else {
|
||||
blocktimeCache[node.ParentTxid] = blocktime
|
||||
}
|
||||
}
|
||||
|
||||
expirationTime, sweepInput, err = txbuilder.GetSweepInput(blocktimeCache[node.ParentTxid], node)
|
||||
var lifetime int64
|
||||
lifetime, sweepInput, err = txbuilder.GetSweepInput(node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
expirationTime = blocktimeCache[node.ParentTxid] + lifetime
|
||||
} else {
|
||||
// cache the blocktime for future use
|
||||
blocktimeCache[node.Txid] = int64(blocktime)
|
||||
if schedulerUnit == ports.BlockHeight {
|
||||
blocktimeCache[node.Txid] = height
|
||||
} else {
|
||||
blocktimeCache[node.Txid] = blocktime
|
||||
}
|
||||
|
||||
// if the tx is onchain, it means that the input is spent
|
||||
// add the children to the nodes in order to check them during the next iteration
|
||||
|
||||
Reference in New Issue
Block a user