Update event and projection stores in sync (#102)

This commit is contained in:
Pietralberto Mazza
2024-02-12 14:07:15 +01:00
committed by GitHub
parent dc00d60585
commit 99961d9fd5
5 changed files with 27 additions and 24 deletions

View File

@@ -34,7 +34,7 @@ lint:
run: clean
@echo "Running arkd in dev mode..."
@export ARK_WALLET_ADDR=localhost:18000; \
export ARK_ROUND_INTERVAL=30; \
export ARK_ROUND_INTERVAL=10; \
go run ./cmd/arkd
## test: runs unit and component tests

View File

@@ -233,8 +233,8 @@ func (s *service) start() {
func (s *service) startRound() {
round := domain.NewRound(dustAmount)
changes, _ := round.StartRegistration()
if err := s.repoManager.Events().Save(
context.Background(), round.Id, changes...,
if err := s.saveEvents(
context.Background(), round.Id, changes,
); err != nil {
log.WithError(err).Warn("failed to store new round events")
return
@@ -258,10 +258,8 @@ func (s *service) startFinalization() {
var changes []domain.RoundEvent
defer func() {
if len(changes) > 0 {
if err := s.repoManager.Events().Save(ctx, round.Id, changes...); err != nil {
log.WithError(err).Warn("failed to store new round events")
}
if err := s.saveEvents(ctx, round.Id, changes); err != nil {
log.WithError(err).Warn("failed to store new round events")
}
if round.IsFailed() {
@@ -341,7 +339,7 @@ func (s *service) finalizeRound() {
var changes []domain.RoundEvent
defer func() {
if err := s.repoManager.Events().Save(ctx, round.Id, changes...); err != nil {
if err := s.saveEvents(ctx, round.Id, changes); err != nil {
log.WithError(err).Warn("failed to store new round events")
return
}
@@ -451,15 +449,6 @@ func (s *service) updateProjectionStore(round *domain.Round) {
}
}()
}
// Always update the status of the round.
for {
if err := s.repoManager.Rounds().AddOrUpdateRound(ctx, *round); err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
break
}
}
func (s *service) propagateEvents(round *domain.Round) {
@@ -587,6 +576,19 @@ func (s *service) extractVtxosScripts(vtxos []domain.Vtxo) ([]string, error) {
return scripts, nil
}
func (s *service) saveEvents(
ctx context.Context, id string, events []domain.RoundEvent,
) error {
if len(events) <= 0 {
return nil
}
round, err := s.repoManager.Events().Save(ctx, id, events...)
if err != nil {
return err
}
return s.repoManager.Rounds().AddOrUpdateRound(ctx, *round)
}
func getSpentVtxos(payments map[string]domain.Payment) []domain.VtxoKey {
vtxos := make([]domain.VtxoKey, 0)
for _, p := range payments {

View File

@@ -5,7 +5,7 @@ import (
)
type RoundEventRepository interface {
Save(ctx context.Context, id string, events ...RoundEvent) error
Save(ctx context.Context, id string, events ...RoundEvent) (*Round, error)
Load(ctx context.Context, id string) (*Round, error)
}

View File

@@ -59,18 +59,18 @@ func NewRoundEventRepository(config ...interface{}) (dbtypes.EventStore, error)
func (r *eventRepository) Save(
ctx context.Context, id string, events ...domain.RoundEvent,
) error {
) (*domain.Round, error) {
allEvents, err := r.get(ctx, id)
if err != nil {
return err
return nil, err
}
allEvents = append(allEvents, events...)
if err := r.upsert(ctx, id, allEvents); err != nil {
return err
return nil, err
}
go r.publishEvents(allEvents)
return nil
return domain.NewRoundFromEvents(allEvents), nil
}
func (r *eventRepository) Load(
ctx context.Context, id string,

View File

@@ -180,10 +180,11 @@ func testRoundEventRepository(t *testing.T, svc ports.RepoManager) {
for _, f := range fixtures {
svc.RegisterEventsHandler(f.handler)
err := svc.Events().Save(ctx, f.roundId, f.events...)
round, err := svc.Events().Save(ctx, f.roundId, f.events...)
require.NoError(t, err)
require.NotNil(t, round)
round, err := svc.Events().Load(ctx, f.roundId)
round, err = svc.Events().Load(ctx, f.roundId)
require.NoError(t, err)
require.NotNil(t, round)
require.Equal(t, f.roundId, round.Id)