diff --git a/server/internal/core/application/service.go b/server/internal/core/application/service.go index da876b8..f6e455d 100644 --- a/server/internal/core/application/service.go +++ b/server/internal/core/application/service.go @@ -79,6 +79,7 @@ type service struct { trustedOnboardingScriptLock *sync.Mutex trustedOnboardingScripts map[string]*secp256k1.PublicKey + currentRound *domain.Round } func NewService( @@ -106,7 +107,7 @@ func NewService( roundLifetime, roundInterval, unilateralExitDelay, minRelayFee, walletSvc, repoManager, builder, scanner, sweeper, paymentRequests, forfeitTxs, eventsCh, onboardingCh, - &sync.Mutex{}, make(map[string]*secp256k1.PublicKey), + &sync.Mutex{}, make(map[string]*secp256k1.PublicKey), nil, } repoManager.RegisterEventsHandler( func(round *domain.Round) { @@ -219,7 +220,7 @@ func (s *service) GetRoundByTxid(ctx context.Context, poolTxid string) (*domain. } func (s *service) GetCurrentRound(ctx context.Context) (*domain.Round, error) { - return s.repoManager.Rounds().GetCurrentRound(ctx) + return domain.NewRoundFromEvents(s.currentRound.Events()), nil } func (s *service) GetInfo(ctx context.Context) (*ServiceInfo, error) { @@ -324,13 +325,9 @@ func (s *service) start() { func (s *service) startRound() { round := domain.NewRound(dustAmount) - changes, _ := round.StartRegistration() - if err := s.saveEvents( - context.Background(), round.Id, changes, - ); err != nil { - log.WithError(err).Warn("failed to store new round events") - return - } + //nolint:all + round.StartRegistration() + s.currentRound = round defer func() { time.Sleep(time.Duration(s.roundInterval/2) * time.Second) @@ -342,15 +339,16 @@ func (s *service) startRound() { func (s *service) startFinalization() { ctx := context.Background() - round, err := s.repoManager.Rounds().GetCurrentRound(ctx) - if err != nil { - log.WithError(err).Warn("failed to retrieve current round") - return - } + round := s.currentRound - var changes []domain.RoundEvent + var roundAborted bool defer func() { - if err := s.saveEvents(ctx, round.Id, changes); err != nil { + if roundAborted { + s.startRound() + return + } + + if err := s.saveEvents(ctx, round.Id, round.Events()); err != nil { log.WithError(err).Warn("failed to store new round events") } @@ -369,8 +367,9 @@ func (s *service) startFinalization() { // TODO: understand how many payments must be popped from the queue and actually registered for the round num := s.paymentRequests.len() if num == 0 { + roundAborted = true err := fmt.Errorf("no payments registered") - changes = round.Fail(fmt.Errorf("round aborted: %s", err)) + round.Fail(fmt.Errorf("round aborted: %s", err)) log.WithError(err).Debugf("round %s aborted", round.Id) return } @@ -378,23 +377,22 @@ func (s *service) startFinalization() { num = paymentsThreshold } payments := s.paymentRequests.pop(num) - changes, err = round.RegisterPayments(payments) - if err != nil { - changes = round.Fail(fmt.Errorf("failed to register payments: %s", err)) + if _, err := round.RegisterPayments(payments); err != nil { + round.Fail(fmt.Errorf("failed to register payments: %s", err)) log.WithError(err).Warn("failed to register payments") return } sweptRounds, err := s.repoManager.Rounds().GetSweptRounds(ctx) if err != nil { - changes = round.Fail(fmt.Errorf("failed to retrieve swept rounds: %s", err)) + round.Fail(fmt.Errorf("failed to retrieve swept rounds: %s", err)) log.WithError(err).Warn("failed to retrieve swept rounds") return } unsignedPoolTx, tree, connectorAddress, err := s.builder.BuildPoolTx(s.pubkey, payments, s.minRelayFee, sweptRounds) if err != nil { - changes = round.Fail(fmt.Errorf("failed to create pool tx: %s", err)) + round.Fail(fmt.Errorf("failed to create pool tx: %s", err)) log.WithError(err).Warn("failed to create pool tx") return } @@ -404,20 +402,20 @@ func (s *service) startFinalization() { connectors, forfeitTxs, err := s.builder.BuildForfeitTxs(s.pubkey, unsignedPoolTx, payments, s.minRelayFee) if err != nil { - changes = round.Fail(fmt.Errorf("failed to create connectors and forfeit txs: %s", err)) + round.Fail(fmt.Errorf("failed to create connectors and forfeit txs: %s", err)) log.WithError(err).Warn("failed to create connectors and forfeit txs") return } log.Debugf("forfeit transactions created for round %s", round.Id) - events, err := round.StartFinalization(connectorAddress, connectors, tree, unsignedPoolTx) - if err != nil { - changes = round.Fail(fmt.Errorf("failed to start finalization: %s", err)) + if _, err := round.StartFinalization( + connectorAddress, connectors, tree, unsignedPoolTx, + ); err != nil { + round.Fail(fmt.Errorf("failed to start finalization: %s", err)) log.WithError(err).Warn("failed to start finalization") return } - changes = append(changes, events...) s.forfeitTxs.push(forfeitTxs) @@ -428,15 +426,13 @@ func (s *service) finalizeRound() { defer s.startRound() ctx := context.Background() - round, err := s.repoManager.Rounds().GetCurrentRound(ctx) - if err != nil { - log.WithError(err).Warn("failed to retrieve current round") - return - } + round := s.currentRound if round.IsFailed() { return } + fmt.Printf("%+v\n", *round) + var changes []domain.RoundEvent defer func() { if err := s.saveEvents(ctx, round.Id, changes); err != nil { diff --git a/server/internal/core/domain/round_repo.go b/server/internal/core/domain/round_repo.go index 5a4b0e9..8864caf 100644 --- a/server/internal/core/domain/round_repo.go +++ b/server/internal/core/domain/round_repo.go @@ -13,7 +13,6 @@ type RoundEventRepository interface { type RoundRepository interface { AddOrUpdateRound(ctx context.Context, round Round) error - GetCurrentRound(ctx context.Context) (*Round, error) GetRoundWithId(ctx context.Context, id string) (*Round, error) GetRoundWithTxid(ctx context.Context, txid string) (*Round, error) GetSweepableRounds(ctx context.Context) ([]Round, error) diff --git a/server/internal/infrastructure/db/badger/round_repo.go b/server/internal/infrastructure/db/badger/round_repo.go index 176f16b..3b2938c 100644 --- a/server/internal/infrastructure/db/badger/round_repo.go +++ b/server/internal/infrastructure/db/badger/round_repo.go @@ -50,20 +50,6 @@ func (r *roundRepository) AddOrUpdateRound( return r.addOrUpdateRound(ctx, round) } -func (r *roundRepository) GetCurrentRound( - ctx context.Context, -) (*domain.Round, error) { - query := badgerhold.Where("Stage.Ended").Eq(false).And("Stage.Failed").Eq(false) - rounds, err := r.findRound(ctx, query) - if err != nil { - return nil, err - } - if len(rounds) <= 0 { - return nil, fmt.Errorf("ongoing round not found") - } - return &rounds[0], nil -} - func (r *roundRepository) GetRoundWithId( ctx context.Context, id string, ) (*domain.Round, error) { diff --git a/server/internal/infrastructure/db/service_test.go b/server/internal/infrastructure/db/service_test.go index 42edcdf..38ccefc 100644 --- a/server/internal/infrastructure/db/service_test.go +++ b/server/internal/infrastructure/db/service_test.go @@ -230,11 +230,6 @@ func testRoundRepository(t *testing.T, svc ports.RepoManager) { err = svc.Rounds().AddOrUpdateRound(ctx, *round) require.NoError(t, err) - currentRound, err := svc.Rounds().GetCurrentRound(ctx) - require.NoError(t, err) - require.NotNil(t, currentRound) - require.Condition(t, roundsMatch(*round, *currentRound)) - roundById, err := svc.Rounds().GetRoundWithId(ctx, roundId) require.NoError(t, err) require.NotNil(t, roundById) @@ -311,14 +306,9 @@ func testRoundRepository(t *testing.T, svc ports.RepoManager) { err = svc.Rounds().AddOrUpdateRound(ctx, *updatedRound) require.NoError(t, err) - currentRound, err = svc.Rounds().GetCurrentRound(ctx) - require.NoError(t, err) - require.NotNil(t, currentRound) - require.Condition(t, roundsMatch(*updatedRound, *currentRound)) - roundById, err = svc.Rounds().GetRoundWithId(ctx, updatedRound.Id) require.NoError(t, err) - require.NotNil(t, currentRound) + require.NotNil(t, roundById) require.Condition(t, roundsMatch(*updatedRound, *roundById)) txid := randomString(32) @@ -336,10 +326,6 @@ func testRoundRepository(t *testing.T, svc ports.RepoManager) { err = svc.Rounds().AddOrUpdateRound(ctx, *finalizedRound) require.NoError(t, err) - currentRound, err = svc.Rounds().GetCurrentRound(ctx) - require.Error(t, err) - require.Nil(t, currentRound) - roundById, err = svc.Rounds().GetRoundWithId(ctx, roundId) require.NoError(t, err) require.NotNil(t, roundById) diff --git a/server/internal/infrastructure/db/sqlite/round_repo.go b/server/internal/infrastructure/db/sqlite/round_repo.go index 6c2f5a8..47755d6 100644 --- a/server/internal/infrastructure/db/sqlite/round_repo.go +++ b/server/internal/infrastructure/db/sqlite/round_repo.go @@ -135,11 +135,10 @@ LEFT OUTER JOIN receiver ON payment.id=receiver.payment_id LEFT OUTER JOIN vtxo ON payment.id=vtxo.payment_id ` - selectCurrentRound = selectRound + " WHERE round.ended = false AND round.failed = false;" selectRoundWithId = selectRound + " WHERE round.id = ?;" selectRoundWithTxId = selectRound + " WHERE round.txid = ?;" selectSweepableRounds = selectRound + " WHERE round.swept = false AND round.ended = true AND round.failed = false;" - selectSweptRounds = selectRound + " WHERE round.swept = true AND round.failed = false AND round.ended = true;" + selectSweptRounds = selectRound + " WHERE round.swept = true AND round.failed = false AND round.ended = true AND round.connector_address <> '';" selectRoundIdsInRange = ` SELECT id FROM round WHERE starting_timestamp > ? AND starting_timestamp < ?; @@ -379,30 +378,6 @@ func (r *roundRepository) AddOrUpdateRound(ctx context.Context, round domain.Rou return tx.Commit() } -func (r *roundRepository) GetCurrentRound(ctx context.Context) (*domain.Round, error) { - stmt, err := r.db.Prepare(selectCurrentRound) - if err != nil { - return nil, err - } - defer stmt.Close() - - rows, err := stmt.Query() - if err != nil { - return nil, err - } - - rounds, err := readRoundRows(rows) - if err != nil { - return nil, err - } - - if len(rounds) == 0 { - return nil, errors.New("no current round") - } - - return rounds[0], nil -} - func (r *roundRepository) GetRoundWithId(ctx context.Context, id string) (*domain.Round, error) { stmt, err := r.db.Prepare(selectRoundWithId) if err != nil { diff --git a/server/internal/infrastructure/ocean-wallet/account.go b/server/internal/infrastructure/ocean-wallet/account.go index aa60716..1eb0887 100644 --- a/server/internal/infrastructure/ocean-wallet/account.go +++ b/server/internal/infrastructure/ocean-wallet/account.go @@ -27,9 +27,13 @@ func (s *service) DeriveConnectorAddress(ctx context.Context) (string, error) { func (s *service) ListConnectorUtxos( ctx context.Context, connectorAddress string, ) ([]ports.TxInput, error) { + addresses := make([]string, 0) + if len(connectorAddress) > 0 { + addresses = append(addresses, connectorAddress) + } res, err := s.accountClient.ListUtxos(ctx, &pb.ListUtxosRequest{ AccountName: connectorAccount, - Addresses: []string{connectorAddress}, + Addresses: addresses, }) if err != nil { return nil, err