Wait for boarding tx confirmation server-side (#129)

* wait for onboarding confirmation server side

* Update server/internal/core/application/service.go

Co-authored-by: Pietralberto Mazza <18440657+altafan@users.noreply.github.com>
Signed-off-by: Louis Singer <41042567+louisinger@users.noreply.github.com>

* Update server/internal/core/application/service.go

Co-authored-by: Pietralberto Mazza <18440657+altafan@users.noreply.github.com>
Signed-off-by: Louis Singer <41042567+louisinger@users.noreply.github.com>

* handleOnboarding func: sleep 30s if an error happens then retry

---------

Signed-off-by: Louis Singer <41042567+louisinger@users.noreply.github.com>
Co-authored-by: Pietralberto Mazza <18440657+altafan@users.noreply.github.com>
This commit is contained in:
Louis Singer
2024-03-08 17:24:47 +01:00
committed by GitHub
parent 767935407d
commit 834af545de
2 changed files with 81 additions and 38 deletions

View File

@@ -38,6 +38,12 @@ type Service interface {
Onboard(ctx context.Context, boardingTx string, congestionTree tree.CongestionTree, userPubkey *secp256k1.PublicKey) error
}
type onboarding struct {
tx string
congestionTree tree.CongestionTree
userPubkey *secp256k1.PublicKey
}
type service struct {
network common.Network
onchainNework network.Network
@@ -56,7 +62,8 @@ type service struct {
paymentRequests *paymentsMap
forfeitTxs *forfeitTxsMap
eventsCh chan domain.RoundEvent
eventsCh chan domain.RoundEvent
onboardingCh chan onboarding
}
func NewService(
@@ -67,6 +74,7 @@ func NewService(
scheduler ports.SchedulerService,
) (Service, error) {
eventsCh := make(chan domain.RoundEvent)
onboardingCh := make(chan onboarding)
paymentRequests := newPaymentsMap(nil)
genesisHash, _ := chainhash.NewHashFromStr(onchainNetwork.GenesisBlockHash)
@@ -82,7 +90,7 @@ func NewService(
network, onchainNetwork, pubkey,
roundLifetime, roundInterval, unilateralExitDelay, minRelayFee,
walletSvc, repoManager, builder, scanner, sweeper,
paymentRequests, forfeitTxs, eventsCh,
paymentRequests, forfeitTxs, eventsCh, onboardingCh,
}
repoManager.RegisterEventsHandler(
func(round *domain.Round) {
@@ -96,6 +104,7 @@ func NewService(
return nil, fmt.Errorf("failed to restore watching vtxos: %s", err)
}
go svc.listenToRedemptions()
go svc.listenToOnboarding()
return svc, nil
}
@@ -124,6 +133,8 @@ func (s *service) Stop() {
log.Debug("closed connection to wallet")
s.repoManager.Close()
log.Debug("closed connection to db")
close(s.eventsCh)
close(s.onboardingCh)
}
func (s *service) SpendVtxos(ctx context.Context, inputs []domain.VtxoKey) (string, error) {
@@ -200,24 +211,30 @@ func (s *service) Onboard(
return fmt.Errorf("failed to parse boarding tx: %s", err)
}
utx, _ := ptx.UnsignedTx()
txid := utx.TxHash().String()
isConfirmed, _, err := s.wallet.IsTransactionConfirmed(ctx, txid)
extracted, err := psetv2.Extract(ptx)
if err != nil {
return fmt.Errorf("failed to fetch confirmation info for boaridng tx: %s", err)
}
if !isConfirmed {
return fmt.Errorf("boarding tx not confirmed yet, please retry later")
return fmt.Errorf("failed to extract boarding tx: %s", err)
}
pubkey := hex.EncodeToString(userPubkey.SerializeCompressed())
payments := getPaymentsFromOnboarding(congestionTree, pubkey)
round := domain.NewFinalizedRound(
dustAmount, pubkey, txid, boardingTx, congestionTree, payments,
)
boardingTxHex, err := extracted.ToHex()
if err != nil {
return fmt.Errorf("failed to convert boarding tx to hex: %s", err)
}
return s.saveEvents(ctx, round.Id, round.Events())
txid, err := s.wallet.BroadcastTransaction(ctx, boardingTxHex)
if err != nil {
return fmt.Errorf("failed to broadcast boarding tx: %s", err)
}
log.Debugf("broadcasted boarding tx %s", txid)
s.onboardingCh <- onboarding{
tx: boardingTx,
congestionTree: congestionTree,
userPubkey: userPubkey,
}
return nil
}
func (s *service) start() {
@@ -367,6 +384,53 @@ func (s *service) finalizeRound() {
log.Debugf("finalized round %s with pool tx %s", round.Id, round.Txid)
}
func (s *service) listenToOnboarding() {
for onboarding := range s.onboardingCh {
go s.handleOnboarding(onboarding)
}
}
func (s *service) handleOnboarding(onboarding onboarding) {
ctx := context.Background()
ptx, _ := psetv2.NewPsetFromBase64(onboarding.tx)
utx, _ := psetv2.Extract(ptx)
txid := utx.TxHash().String()
// wait for the tx to be confirmed with a timeout
timeout := time.NewTimer(5 * time.Minute)
defer timeout.Stop()
isConfirmed := false
for !isConfirmed {
select {
case <-timeout.C:
log.WithError(fmt.Errorf("operation timed out")).Warnf("failed to get confirmation for boarding tx %s", txid)
return
default:
var err error
isConfirmed, _, err = s.wallet.IsTransactionConfirmed(ctx, txid)
if err != nil {
log.WithError(err).Warn("failed to check tx confirmation")
}
if err != nil || !isConfirmed {
time.Sleep(30 * time.Second)
}
}
}
pubkey := hex.EncodeToString(onboarding.userPubkey.SerializeCompressed())
payments := getPaymentsFromOnboarding(onboarding.congestionTree, pubkey)
round := domain.NewFinalizedRound(
dustAmount, pubkey, txid, onboarding.tx, onboarding.congestionTree, payments,
)
if err := s.saveEvents(ctx, round.Id, round.Events()); err != nil {
log.WithError(err).Warn("failed to store new round events")
return
}
}
func (s *service) listenToRedemptions() {
ctx := context.Background()
chVtxos := s.scanner.GetNotificationChannel(ctx)