diff --git a/go.mod b/go.mod index 21a460a..d9ef56d 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ replace github.com/ark-network/ark/common => ./pkg/common require ( github.com/ark-network/ark/common v0.0.0-00010101000000-000000000000 + github.com/google/uuid v1.3.1 github.com/sirupsen/logrus v1.9.3 github.com/spf13/viper v1.17.0 github.com/vulpemventures/go-elements v0.4.7 diff --git a/go.sum b/go.sum index c731e3c..9ac65e2 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,8 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= diff --git a/internal/config/config.go b/internal/config/config.go index 5677749..4a7067d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,14 +9,17 @@ import ( ) type Config struct { - WalletAddr string + WalletAddr string + RoundInterval int64 } var ( - Datadir = "DATADIR" - WalletAddr = "WALLET_ADDR" + Datadir = "DATADIR" + WalletAddr = "WALLET_ADDR" + RoundInterval = "ROUND_INTERVAL" - defaultDatadir = common.AppDataDir("coordinatord", false) + defaultDatadir = common.AppDataDir("coordinatord", false) + defaultRoundInterval = 60 ) func LoadConfig() (*Config, error) { @@ -24,13 +27,15 @@ func LoadConfig() (*Config, error) { viper.AutomaticEnv() viper.SetDefault(Datadir, defaultDatadir) + viper.SetDefault(RoundInterval, defaultRoundInterval) if err := initDatadir(); err != nil { return nil, fmt.Errorf("error while creating datadir: %s", err) } cfg := &Config{ - WalletAddr: viper.GetString(WalletAddr), + WalletAddr: viper.GetString(WalletAddr), + RoundInterval: viper.GetInt64(RoundInterval), } if err := cfg.validate(); err != nil { @@ -43,6 +48,9 @@ func (c *Config) validate() error { if len(c.WalletAddr) <= 0 { return fmt.Errorf("missing wallet address") } + if c.RoundInterval < 5 { + return fmt.Errorf("round interval must be at least 5 seconds") + } return nil } diff --git a/internal/core/application/.gitkeep b/internal/core/application/.gitkeep deleted file mode 100755 index e69de29..0000000 diff --git a/internal/core/application/service.go b/internal/core/application/service.go new file mode 100644 index 0000000..aea399e --- /dev/null +++ b/internal/core/application/service.go @@ -0,0 +1,124 @@ +package application + +import ( + "context" + "fmt" + + "github.com/ark-network/ark/internal/core/domain" + "github.com/ark-network/ark/internal/core/ports" + log "github.com/sirupsen/logrus" +) + +type service struct { + roundInterval int64 + + wallet ports.WalletService + scheduler ports.SchedulerService + repoManager ports.RepoManager + builder ports.TxBuilder +} + +func NewService( + interval int64, + walletSvc ports.WalletService, schedulerSvc ports.SchedulerService, + repoManager ports.RepoManager, builder ports.TxBuilder, +) *service { + return &service{interval, walletSvc, schedulerSvc, repoManager, builder} +} + +func (s *service) Start() { + s.start() +} + +func (s *service) start() { + startImmediately := true + finalizationInterval := int64(s.roundInterval / 2) + s.scheduler.ScheduleTask(s.roundInterval, startImmediately, s.startRound) + s.scheduler.ScheduleTask(finalizationInterval, !startImmediately, s.startFinalization) + s.scheduler.ScheduleTask(s.roundInterval-1, !startImmediately, s.finalizeRound) +} + +func (s *service) startRound() { + round := domain.NewRound() + changes, _ := round.StartRegistration() + if err := s.repoManager.Events().Save(context.Background(), changes...); err != nil { + log.WithError(err).Warn("failed to store new round events") + return + } + + log.Debugf("started registration stage for new round: %s", round.Id) +} + +func (s *service) startFinalization() { + ctx := context.Background() + round, err := s.repoManager.Rounds().GetCurrentRound(ctx) + if err != nil { + log.WithError(err).Warn("failed to retrieve current round") + return + } + if round.IsFailed() { + return + } + + allPayments := make([]domain.Payment, 0, len(round.Payments)) + for _, p := range round.Payments { + allPayments = append(allPayments, p) + } + + signedPoolTx, err := s.builder.BuildPoolTx(s.wallet, allPayments) + if err != nil { + round.Fail(fmt.Errorf("failed to create pool tx: %s", err)) + log.WithError(err).Warn("failed to create pool tx") + return + } + + tree, err := s.builder.BuildCongestionTree(signedPoolTx, allPayments) + if err != nil { + round.Fail(fmt.Errorf("failed to create congestion tree: %s", err)) + log.WithError(err).Warn("failed to create congestion tree") + return + } + + connectors, forfeitTxs, err := s.builder.BuildForfeitTxs(signedPoolTx, allPayments) + if err != nil { + round.Fail(fmt.Errorf("failed to create connectors and forfeit txs: %s", err)) + log.WithError(err).Warn("failed to create connectors and forfeit txs") + return + } + + changes, _ := round.StartFinalization(connectors, forfeitTxs, tree, signedPoolTx) + + if err := s.repoManager.Events().Save(ctx, changes...); err != nil { + log.WithError(err).Warn("failed to store new round events") + return + } + + log.Debugf("started finalization stage for round: %s", round.Id) +} + +func (s *service) finalizeRound() { + ctx := context.Background() + round, err := s.repoManager.Rounds().GetCurrentRound(ctx) + if err != nil { + log.WithError(err).Warn("failed to retrieve current round") + return + } + if round.IsFailed() { + return + } + + txid, err := s.wallet.Transaction().BroadcastTransaction(ctx, round.TxHex) + if err != nil { + round.Fail(fmt.Errorf("failed to broadcast pool tx: %s", err)) + log.WithError(err).Warn("failed to broadcast pool tx") + return + } + + changes, _ := round.EndFinalization(txid) + if err := s.repoManager.Events().Save(ctx, changes...); err != nil { + log.WithError(err).Warn("failed to store new round events") + return + } + + log.Debugf("finalized round %s with pool tx %s", round.Id, round.Txid) +} diff --git a/internal/core/domain/.gitkeep b/internal/core/domain/.gitkeep deleted file mode 100755 index e69de29..0000000 diff --git a/internal/core/domain/events.go b/internal/core/domain/events.go new file mode 100644 index 0000000..992b8f3 --- /dev/null +++ b/internal/core/domain/events.go @@ -0,0 +1,49 @@ +package domain + +type RoundEvent interface { + isEvent() +} + +func (r RoundStarted) isEvent() {} +func (r RoundFinalizationStarted) isEvent() {} +func (r RoundFinalized) isEvent() {} +func (r RoundFailed) isEvent() {} +func (r InputsRegistered) isEvent() {} +func (r OutputsRegistered) isEvent() {} + +type RoundStarted struct { + Id string + Timestamp int64 +} + +type RoundFinalizationStarted struct { + Id string + ForfeitTxs []string + CongestionTree []string + Connectors []string + PoolTx string +} + +type RoundFinalized struct { + Id string + Txid string + Timestamp int64 +} + +type RoundFailed struct { + Id string + Err error + Timestamp int64 +} + +type InputsRegistered struct { + Id string + PaymentId string + PaymentInputs []Vtxo +} + +type OutputsRegistered struct { + Id string + PaymentId string + PaymentOutputs []Receiver +} diff --git a/internal/core/domain/payment.go b/internal/core/domain/payment.go new file mode 100644 index 0000000..f67bf96 --- /dev/null +++ b/internal/core/domain/payment.go @@ -0,0 +1,25 @@ +package domain + +type Payment struct { + Id string + Inputs []Vtxo + Receivers []Receiver +} + +func (p Payment) TotOutputAmount() uint64 { + tot := uint64(0) + for _, r := range p.Receivers { + tot += r.Amount + } + return tot +} + +type Vtxo struct { + Txid string + VOut uint32 +} + +type Receiver struct { + Pubkey string + Amount uint64 +} diff --git a/internal/core/domain/round.go b/internal/core/domain/round.go new file mode 100644 index 0000000..af33581 --- /dev/null +++ b/internal/core/domain/round.go @@ -0,0 +1,242 @@ +package domain + +import ( + "fmt" + "time" + + "github.com/google/uuid" +) + +const ( + UndefinedStage RoundStage = iota + RegistrationStage + FinalizationStage + + dustAmount = 450 +) + +type RoundStage int + +func (s RoundStage) String() string { + switch s { + case RegistrationStage: + return "REGISTRATION_STAGE" + case FinalizationStage: + return "FINALIZATION_STAGE" + default: + return "UNDEFINED_STAGE" + } +} + +type Stage struct { + Code RoundStage + Ended bool + Failed bool +} + +type Round struct { + Id string + StartingTimestamp int64 + EndingTimestamp int64 + Stage Stage + Payments map[string]Payment + Txid string + TxHex string + ForfeitTxs []string + CongestionTree []string + Connectors []string + Version uint + Changes []RoundEvent +} + +func NewRound() *Round { + return &Round{ + Id: uuid.New().String(), + Payments: make(map[string]Payment), + Changes: make([]RoundEvent, 0), + } +} + +func NewRoundFromEvents(events []RoundEvent) *Round { + r := &Round{} + + for _, event := range events { + r.On(event, true) + } + + r.Changes = append([]RoundEvent{}, events...) + + return r +} + +func (r *Round) On(event RoundEvent, replayed bool) { + switch e := event.(type) { + case RoundStarted: + r.Stage.Code = RegistrationStage + r.Id = e.Id + r.StartingTimestamp = e.Timestamp + case RoundFinalizationStarted: + r.Stage.Code = FinalizationStage + r.ForfeitTxs = append([]string{}, e.ForfeitTxs...) + r.CongestionTree = append([]string{}, e.CongestionTree...) + r.Connectors = append([]string{}, e.Connectors...) + r.TxHex = e.PoolTx + case RoundFinalized: + r.Stage.Ended = true + r.Txid = e.Txid + r.EndingTimestamp = e.Timestamp + case RoundFailed: + r.Stage.Failed = true + r.EndingTimestamp = e.Timestamp + case InputsRegistered: + if r.Payments == nil { + r.Payments = make(map[string]Payment) + } + r.Payments[e.PaymentId] = Payment{ + Id: e.PaymentId, + Inputs: e.PaymentInputs, + } + case OutputsRegistered: + r.Payments[e.PaymentId] = Payment{ + Id: e.PaymentId, + Inputs: r.Payments[e.PaymentId].Inputs, + Receivers: e.PaymentOutputs, + } + } + + if replayed { + r.Version++ + } +} + +func (r *Round) StartRegistration() ([]RoundEvent, error) { + empty := Stage{} + if r.Stage != empty { + return nil, fmt.Errorf("not in a valid stage to start payment registration") + } + + event := RoundStarted{ + Id: r.Id, + Timestamp: time.Now().Unix(), + } + r.raise(event) + + return []RoundEvent{event}, nil +} + +func (r *Round) StartFinalization(connectors, txs, tree []string, poolTx string) ([]RoundEvent, error) { + if r.Stage.Code != RegistrationStage || r.IsFailed() { + return nil, fmt.Errorf("not in a valid stage to start payment finalization") + } + + event := RoundFinalizationStarted{ + Id: r.Id, + ForfeitTxs: txs, + CongestionTree: tree, + Connectors: connectors, + PoolTx: poolTx, + } + r.raise(event) + + return []RoundEvent{event}, nil +} + +func (r *Round) EndFinalization(txid string) ([]RoundEvent, error) { + if r.Stage.Code != FinalizationStage || r.IsFailed() { + return nil, fmt.Errorf("not in a valid stage to end payment finalization") + } + if r.Stage.Ended { + return nil, fmt.Errorf("payment finalization already ended") + } + event := RoundFinalized{ + Id: r.Id, + Txid: txid, + Timestamp: time.Now().Unix(), + } + r.raise(event) + + return []RoundEvent{event}, nil +} + +func (r *Round) Fail(err error) []RoundEvent { + if r.Stage.Failed { + return nil + } + event := RoundFailed{ + Id: r.Id, + Err: err, + Timestamp: time.Now().Unix(), + } + r.raise(event) + + return []RoundEvent{event} +} + +func (r *Round) RegisterInputs(id string, ins []Vtxo) ([]RoundEvent, error) { + if r.Stage.Code != RegistrationStage || r.IsFailed() { + return nil, fmt.Errorf("not in a valid stage to register inputs") + } + if r.Stage.Ended { + return nil, fmt.Errorf("payment registration already ended") + } + + event := InputsRegistered{ + Id: r.Id, + PaymentId: id, + PaymentInputs: ins, + } + r.raise(event) + + return []RoundEvent{event}, nil +} + +func (r *Round) RegisterOutputs(id string, outs []Receiver) ([]RoundEvent, error) { + if r.Stage.Code != RegistrationStage || r.IsFailed() { + return nil, fmt.Errorf("not in a valid stage to register inputs") + } + if r.Stage.Ended { + return nil, fmt.Errorf("payment registration already ended") + } + + event := OutputsRegistered{ + Id: r.Id, + PaymentId: id, + PaymentOutputs: outs, + } + r.raise(event) + + return []RoundEvent{event}, nil +} + +func (r *Round) IsStarted() bool { + empty := Stage{} + return !r.IsFailed() && (r.Stage != empty && !r.IsEnded()) +} + +func (r *Round) IsEnded() bool { + return !r.IsFailed() && (r.Stage.Code == FinalizationStage && r.Stage.Ended) +} + +func (r *Round) IsFailed() bool { + return r.Stage.Failed +} + +func (r *Round) TotInputAmount() uint64 { + return uint64(len(r.Payments) * dustAmount) +} + +func (r *Round) TotOutputAmount() uint64 { + tot := uint64(0) + for _, p := range r.Payments { + tot += p.TotOutputAmount() + } + return tot +} + +func (r *Round) raise(event RoundEvent) { + if r.Changes == nil { + r.Changes = make([]RoundEvent, 0) + } + r.Changes = append(r.Changes, event) + r.On(event, false) +} diff --git a/internal/core/domain/round_repo.go b/internal/core/domain/round_repo.go new file mode 100644 index 0000000..680b095 --- /dev/null +++ b/internal/core/domain/round_repo.go @@ -0,0 +1,18 @@ +package domain + +import "context" + +type RoundEventRepository interface { + Save(ctx context.Context, events ...RoundEvent) error + Load(ctx context.Context, id string) (*Round, error) +} + +type RoundRepository interface { + AddRound(ctx context.Context, round *Round) error + GetCurrentRound(ctx context.Context) (*Round, error) + GetRoundWithId(ctx, id string) (*Round, error) + GetRoundWithTxid(ctx, txid string) (*Round, error) + UpdateRound( + ctx context.Context, id string, updateFn func(r *Round) (*Round, error), + ) error +} diff --git a/internal/core/ports/repo_manager.go b/internal/core/ports/repo_manager.go new file mode 100644 index 0000000..d2a3d3f --- /dev/null +++ b/internal/core/ports/repo_manager.go @@ -0,0 +1,8 @@ +package ports + +import "github.com/ark-network/ark/internal/core/domain" + +type RepoManager interface { + Events() domain.RoundEventRepository + Rounds() domain.RoundRepository +} diff --git a/internal/core/ports/scheduler.go b/internal/core/ports/scheduler.go new file mode 100644 index 0000000..f783714 --- /dev/null +++ b/internal/core/ports/scheduler.go @@ -0,0 +1,8 @@ +package ports + +type SchedulerService interface { + Start() + Stop() + + ScheduleTask(interval int64, immediate bool, task func()) +} diff --git a/internal/core/ports/tx_builder.go b/internal/core/ports/tx_builder.go new file mode 100644 index 0000000..c8d9552 --- /dev/null +++ b/internal/core/ports/tx_builder.go @@ -0,0 +1,9 @@ +package ports + +import "github.com/ark-network/ark/internal/core/domain" + +type TxBuilder interface { + BuildPoolTx(wallet WalletService, payments []domain.Payment) (poolTx string, err error) + BuildCongestionTree(poolTx string, payments []domain.Payment) (congestionTree []string, err error) + BuildForfeitTxs(poolTx string, payments []domain.Payment) (connectors []string, forfeitTxs []string, err error) +} diff --git a/internal/core/ports/wallet.go b/internal/core/ports/wallet.go index 5a75ae9..2a2505d 100644 --- a/internal/core/ports/wallet.go +++ b/internal/core/ports/wallet.go @@ -108,7 +108,6 @@ type TxInput interface { } type TxOutput interface { - GetAsset() string GetAmount() uint64 GetScript() string } diff --git a/internal/infrastructure/ocean-wallet/transaction.go b/internal/infrastructure/ocean-wallet/transaction.go index bf22883..b6e91d0 100644 --- a/internal/infrastructure/ocean-wallet/transaction.go +++ b/internal/infrastructure/ocean-wallet/transaction.go @@ -118,7 +118,6 @@ func (l outputList) toProto() []*pb.Output { list := make([]*pb.Output, 0, len(l)) for _, out := range l { list = append(list, &pb.Output{ - Asset: out.GetAsset(), Amount: out.GetAmount(), Script: out.GetScript(), })