From 99961d9fd5a2d441d3e462f2ecca422b503d7e8e Mon Sep 17 00:00:00 2001 From: Pietralberto Mazza <18440657+altafan@users.noreply.github.com> Date: Mon, 12 Feb 2024 14:07:15 +0100 Subject: [PATCH] Update event and projection stores in sync (#102) --- server/Makefile | 2 +- server/internal/core/application/service.go | 34 ++++++++++--------- server/internal/core/domain/round_repo.go | 2 +- .../infrastructure/db/badger/event_repo.go | 8 ++--- .../infrastructure/db/service_test.go | 5 +-- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/server/Makefile b/server/Makefile index 9f233d4..215b4e2 100755 --- a/server/Makefile +++ b/server/Makefile @@ -34,7 +34,7 @@ lint: run: clean @echo "Running arkd in dev mode..." @export ARK_WALLET_ADDR=localhost:18000; \ - export ARK_ROUND_INTERVAL=30; \ + export ARK_ROUND_INTERVAL=10; \ go run ./cmd/arkd ## test: runs unit and component tests diff --git a/server/internal/core/application/service.go b/server/internal/core/application/service.go index 7a2f1fe..97a8b2e 100644 --- a/server/internal/core/application/service.go +++ b/server/internal/core/application/service.go @@ -233,8 +233,8 @@ func (s *service) start() { func (s *service) startRound() { round := domain.NewRound(dustAmount) changes, _ := round.StartRegistration() - if err := s.repoManager.Events().Save( - context.Background(), round.Id, changes..., + if err := s.saveEvents( + context.Background(), round.Id, changes, ); err != nil { log.WithError(err).Warn("failed to store new round events") return @@ -258,10 +258,8 @@ func (s *service) startFinalization() { var changes []domain.RoundEvent defer func() { - if len(changes) > 0 { - if err := s.repoManager.Events().Save(ctx, round.Id, changes...); err != nil { - log.WithError(err).Warn("failed to store new round events") - } + if err := s.saveEvents(ctx, round.Id, changes); err != nil { + log.WithError(err).Warn("failed to store new round events") } if round.IsFailed() { @@ -341,7 +339,7 @@ func (s *service) finalizeRound() { var changes []domain.RoundEvent defer func() { - if err := s.repoManager.Events().Save(ctx, round.Id, changes...); err != nil { + if err := s.saveEvents(ctx, round.Id, changes); err != nil { log.WithError(err).Warn("failed to store new round events") return } @@ -451,15 +449,6 @@ func (s *service) updateProjectionStore(round *domain.Round) { } }() } - - // Always update the status of the round. - for { - if err := s.repoManager.Rounds().AddOrUpdateRound(ctx, *round); err != nil { - time.Sleep(100 * time.Millisecond) - continue - } - break - } } func (s *service) propagateEvents(round *domain.Round) { @@ -587,6 +576,19 @@ func (s *service) extractVtxosScripts(vtxos []domain.Vtxo) ([]string, error) { return scripts, nil } +func (s *service) saveEvents( + ctx context.Context, id string, events []domain.RoundEvent, +) error { + if len(events) <= 0 { + return nil + } + round, err := s.repoManager.Events().Save(ctx, id, events...) + if err != nil { + return err + } + return s.repoManager.Rounds().AddOrUpdateRound(ctx, *round) +} + func getSpentVtxos(payments map[string]domain.Payment) []domain.VtxoKey { vtxos := make([]domain.VtxoKey, 0) for _, p := range payments { diff --git a/server/internal/core/domain/round_repo.go b/server/internal/core/domain/round_repo.go index 4f78193..cc8ea65 100644 --- a/server/internal/core/domain/round_repo.go +++ b/server/internal/core/domain/round_repo.go @@ -5,7 +5,7 @@ import ( ) type RoundEventRepository interface { - Save(ctx context.Context, id string, events ...RoundEvent) error + Save(ctx context.Context, id string, events ...RoundEvent) (*Round, error) Load(ctx context.Context, id string) (*Round, error) } diff --git a/server/internal/infrastructure/db/badger/event_repo.go b/server/internal/infrastructure/db/badger/event_repo.go index da01f54..9252fb9 100644 --- a/server/internal/infrastructure/db/badger/event_repo.go +++ b/server/internal/infrastructure/db/badger/event_repo.go @@ -59,18 +59,18 @@ func NewRoundEventRepository(config ...interface{}) (dbtypes.EventStore, error) func (r *eventRepository) Save( ctx context.Context, id string, events ...domain.RoundEvent, -) error { +) (*domain.Round, error) { allEvents, err := r.get(ctx, id) if err != nil { - return err + return nil, err } allEvents = append(allEvents, events...) if err := r.upsert(ctx, id, allEvents); err != nil { - return err + return nil, err } go r.publishEvents(allEvents) - return nil + return domain.NewRoundFromEvents(allEvents), nil } func (r *eventRepository) Load( ctx context.Context, id string, diff --git a/server/internal/infrastructure/db/service_test.go b/server/internal/infrastructure/db/service_test.go index fd77a1e..6b23c99 100644 --- a/server/internal/infrastructure/db/service_test.go +++ b/server/internal/infrastructure/db/service_test.go @@ -180,10 +180,11 @@ func testRoundEventRepository(t *testing.T, svc ports.RepoManager) { for _, f := range fixtures { svc.RegisterEventsHandler(f.handler) - err := svc.Events().Save(ctx, f.roundId, f.events...) + round, err := svc.Events().Save(ctx, f.roundId, f.events...) require.NoError(t, err) + require.NotNil(t, round) - round, err := svc.Events().Load(ctx, f.roundId) + round, err = svc.Events().Load(ctx, f.roundId) require.NoError(t, err) require.NotNil(t, round) require.Equal(t, f.roundId, round.Id)