mirror of
https://github.com/aljazceru/ark.git
synced 2026-02-11 14:34:24 +01:00
Add core logic (#11)
* Add domain and app layers * Update repo interface * Lint * Rename events & fixes * Add tx builder interface
This commit is contained in:
committed by
GitHub
parent
b0bd610944
commit
27b54f4c41
1
go.mod
1
go.mod
@@ -6,6 +6,7 @@ replace github.com/ark-network/ark/common => ./pkg/common
|
||||
|
||||
require (
|
||||
github.com/ark-network/ark/common v0.0.0-00010101000000-000000000000
|
||||
github.com/google/uuid v1.3.1
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/viper v1.17.0
|
||||
github.com/vulpemventures/go-elements v0.4.7
|
||||
|
||||
2
go.sum
2
go.sum
@@ -160,6 +160,8 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe
|
||||
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
|
||||
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
|
||||
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
|
||||
|
||||
@@ -9,14 +9,17 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
WalletAddr string
|
||||
WalletAddr string
|
||||
RoundInterval int64
|
||||
}
|
||||
|
||||
var (
|
||||
Datadir = "DATADIR"
|
||||
WalletAddr = "WALLET_ADDR"
|
||||
Datadir = "DATADIR"
|
||||
WalletAddr = "WALLET_ADDR"
|
||||
RoundInterval = "ROUND_INTERVAL"
|
||||
|
||||
defaultDatadir = common.AppDataDir("coordinatord", false)
|
||||
defaultDatadir = common.AppDataDir("coordinatord", false)
|
||||
defaultRoundInterval = 60
|
||||
)
|
||||
|
||||
func LoadConfig() (*Config, error) {
|
||||
@@ -24,13 +27,15 @@ func LoadConfig() (*Config, error) {
|
||||
viper.AutomaticEnv()
|
||||
|
||||
viper.SetDefault(Datadir, defaultDatadir)
|
||||
viper.SetDefault(RoundInterval, defaultRoundInterval)
|
||||
|
||||
if err := initDatadir(); err != nil {
|
||||
return nil, fmt.Errorf("error while creating datadir: %s", err)
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
WalletAddr: viper.GetString(WalletAddr),
|
||||
WalletAddr: viper.GetString(WalletAddr),
|
||||
RoundInterval: viper.GetInt64(RoundInterval),
|
||||
}
|
||||
|
||||
if err := cfg.validate(); err != nil {
|
||||
@@ -43,6 +48,9 @@ func (c *Config) validate() error {
|
||||
if len(c.WalletAddr) <= 0 {
|
||||
return fmt.Errorf("missing wallet address")
|
||||
}
|
||||
if c.RoundInterval < 5 {
|
||||
return fmt.Errorf("round interval must be at least 5 seconds")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
124
internal/core/application/service.go
Normal file
124
internal/core/application/service.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ark-network/ark/internal/core/domain"
|
||||
"github.com/ark-network/ark/internal/core/ports"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type service struct {
|
||||
roundInterval int64
|
||||
|
||||
wallet ports.WalletService
|
||||
scheduler ports.SchedulerService
|
||||
repoManager ports.RepoManager
|
||||
builder ports.TxBuilder
|
||||
}
|
||||
|
||||
func NewService(
|
||||
interval int64,
|
||||
walletSvc ports.WalletService, schedulerSvc ports.SchedulerService,
|
||||
repoManager ports.RepoManager, builder ports.TxBuilder,
|
||||
) *service {
|
||||
return &service{interval, walletSvc, schedulerSvc, repoManager, builder}
|
||||
}
|
||||
|
||||
func (s *service) Start() {
|
||||
s.start()
|
||||
}
|
||||
|
||||
func (s *service) start() {
|
||||
startImmediately := true
|
||||
finalizationInterval := int64(s.roundInterval / 2)
|
||||
s.scheduler.ScheduleTask(s.roundInterval, startImmediately, s.startRound)
|
||||
s.scheduler.ScheduleTask(finalizationInterval, !startImmediately, s.startFinalization)
|
||||
s.scheduler.ScheduleTask(s.roundInterval-1, !startImmediately, s.finalizeRound)
|
||||
}
|
||||
|
||||
func (s *service) startRound() {
|
||||
round := domain.NewRound()
|
||||
changes, _ := round.StartRegistration()
|
||||
if err := s.repoManager.Events().Save(context.Background(), changes...); err != nil {
|
||||
log.WithError(err).Warn("failed to store new round events")
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("started registration stage for new round: %s", round.Id)
|
||||
}
|
||||
|
||||
func (s *service) startFinalization() {
|
||||
ctx := context.Background()
|
||||
round, err := s.repoManager.Rounds().GetCurrentRound(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("failed to retrieve current round")
|
||||
return
|
||||
}
|
||||
if round.IsFailed() {
|
||||
return
|
||||
}
|
||||
|
||||
allPayments := make([]domain.Payment, 0, len(round.Payments))
|
||||
for _, p := range round.Payments {
|
||||
allPayments = append(allPayments, p)
|
||||
}
|
||||
|
||||
signedPoolTx, err := s.builder.BuildPoolTx(s.wallet, allPayments)
|
||||
if err != nil {
|
||||
round.Fail(fmt.Errorf("failed to create pool tx: %s", err))
|
||||
log.WithError(err).Warn("failed to create pool tx")
|
||||
return
|
||||
}
|
||||
|
||||
tree, err := s.builder.BuildCongestionTree(signedPoolTx, allPayments)
|
||||
if err != nil {
|
||||
round.Fail(fmt.Errorf("failed to create congestion tree: %s", err))
|
||||
log.WithError(err).Warn("failed to create congestion tree")
|
||||
return
|
||||
}
|
||||
|
||||
connectors, forfeitTxs, err := s.builder.BuildForfeitTxs(signedPoolTx, allPayments)
|
||||
if err != nil {
|
||||
round.Fail(fmt.Errorf("failed to create connectors and forfeit txs: %s", err))
|
||||
log.WithError(err).Warn("failed to create connectors and forfeit txs")
|
||||
return
|
||||
}
|
||||
|
||||
changes, _ := round.StartFinalization(connectors, forfeitTxs, tree, signedPoolTx)
|
||||
|
||||
if err := s.repoManager.Events().Save(ctx, changes...); err != nil {
|
||||
log.WithError(err).Warn("failed to store new round events")
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("started finalization stage for round: %s", round.Id)
|
||||
}
|
||||
|
||||
func (s *service) finalizeRound() {
|
||||
ctx := context.Background()
|
||||
round, err := s.repoManager.Rounds().GetCurrentRound(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("failed to retrieve current round")
|
||||
return
|
||||
}
|
||||
if round.IsFailed() {
|
||||
return
|
||||
}
|
||||
|
||||
txid, err := s.wallet.Transaction().BroadcastTransaction(ctx, round.TxHex)
|
||||
if err != nil {
|
||||
round.Fail(fmt.Errorf("failed to broadcast pool tx: %s", err))
|
||||
log.WithError(err).Warn("failed to broadcast pool tx")
|
||||
return
|
||||
}
|
||||
|
||||
changes, _ := round.EndFinalization(txid)
|
||||
if err := s.repoManager.Events().Save(ctx, changes...); err != nil {
|
||||
log.WithError(err).Warn("failed to store new round events")
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("finalized round %s with pool tx %s", round.Id, round.Txid)
|
||||
}
|
||||
49
internal/core/domain/events.go
Normal file
49
internal/core/domain/events.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package domain
|
||||
|
||||
type RoundEvent interface {
|
||||
isEvent()
|
||||
}
|
||||
|
||||
func (r RoundStarted) isEvent() {}
|
||||
func (r RoundFinalizationStarted) isEvent() {}
|
||||
func (r RoundFinalized) isEvent() {}
|
||||
func (r RoundFailed) isEvent() {}
|
||||
func (r InputsRegistered) isEvent() {}
|
||||
func (r OutputsRegistered) isEvent() {}
|
||||
|
||||
type RoundStarted struct {
|
||||
Id string
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
type RoundFinalizationStarted struct {
|
||||
Id string
|
||||
ForfeitTxs []string
|
||||
CongestionTree []string
|
||||
Connectors []string
|
||||
PoolTx string
|
||||
}
|
||||
|
||||
type RoundFinalized struct {
|
||||
Id string
|
||||
Txid string
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
type RoundFailed struct {
|
||||
Id string
|
||||
Err error
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
type InputsRegistered struct {
|
||||
Id string
|
||||
PaymentId string
|
||||
PaymentInputs []Vtxo
|
||||
}
|
||||
|
||||
type OutputsRegistered struct {
|
||||
Id string
|
||||
PaymentId string
|
||||
PaymentOutputs []Receiver
|
||||
}
|
||||
25
internal/core/domain/payment.go
Normal file
25
internal/core/domain/payment.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package domain
|
||||
|
||||
type Payment struct {
|
||||
Id string
|
||||
Inputs []Vtxo
|
||||
Receivers []Receiver
|
||||
}
|
||||
|
||||
func (p Payment) TotOutputAmount() uint64 {
|
||||
tot := uint64(0)
|
||||
for _, r := range p.Receivers {
|
||||
tot += r.Amount
|
||||
}
|
||||
return tot
|
||||
}
|
||||
|
||||
type Vtxo struct {
|
||||
Txid string
|
||||
VOut uint32
|
||||
}
|
||||
|
||||
type Receiver struct {
|
||||
Pubkey string
|
||||
Amount uint64
|
||||
}
|
||||
242
internal/core/domain/round.go
Normal file
242
internal/core/domain/round.go
Normal file
@@ -0,0 +1,242 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
UndefinedStage RoundStage = iota
|
||||
RegistrationStage
|
||||
FinalizationStage
|
||||
|
||||
dustAmount = 450
|
||||
)
|
||||
|
||||
type RoundStage int
|
||||
|
||||
func (s RoundStage) String() string {
|
||||
switch s {
|
||||
case RegistrationStage:
|
||||
return "REGISTRATION_STAGE"
|
||||
case FinalizationStage:
|
||||
return "FINALIZATION_STAGE"
|
||||
default:
|
||||
return "UNDEFINED_STAGE"
|
||||
}
|
||||
}
|
||||
|
||||
type Stage struct {
|
||||
Code RoundStage
|
||||
Ended bool
|
||||
Failed bool
|
||||
}
|
||||
|
||||
type Round struct {
|
||||
Id string
|
||||
StartingTimestamp int64
|
||||
EndingTimestamp int64
|
||||
Stage Stage
|
||||
Payments map[string]Payment
|
||||
Txid string
|
||||
TxHex string
|
||||
ForfeitTxs []string
|
||||
CongestionTree []string
|
||||
Connectors []string
|
||||
Version uint
|
||||
Changes []RoundEvent
|
||||
}
|
||||
|
||||
func NewRound() *Round {
|
||||
return &Round{
|
||||
Id: uuid.New().String(),
|
||||
Payments: make(map[string]Payment),
|
||||
Changes: make([]RoundEvent, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func NewRoundFromEvents(events []RoundEvent) *Round {
|
||||
r := &Round{}
|
||||
|
||||
for _, event := range events {
|
||||
r.On(event, true)
|
||||
}
|
||||
|
||||
r.Changes = append([]RoundEvent{}, events...)
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Round) On(event RoundEvent, replayed bool) {
|
||||
switch e := event.(type) {
|
||||
case RoundStarted:
|
||||
r.Stage.Code = RegistrationStage
|
||||
r.Id = e.Id
|
||||
r.StartingTimestamp = e.Timestamp
|
||||
case RoundFinalizationStarted:
|
||||
r.Stage.Code = FinalizationStage
|
||||
r.ForfeitTxs = append([]string{}, e.ForfeitTxs...)
|
||||
r.CongestionTree = append([]string{}, e.CongestionTree...)
|
||||
r.Connectors = append([]string{}, e.Connectors...)
|
||||
r.TxHex = e.PoolTx
|
||||
case RoundFinalized:
|
||||
r.Stage.Ended = true
|
||||
r.Txid = e.Txid
|
||||
r.EndingTimestamp = e.Timestamp
|
||||
case RoundFailed:
|
||||
r.Stage.Failed = true
|
||||
r.EndingTimestamp = e.Timestamp
|
||||
case InputsRegistered:
|
||||
if r.Payments == nil {
|
||||
r.Payments = make(map[string]Payment)
|
||||
}
|
||||
r.Payments[e.PaymentId] = Payment{
|
||||
Id: e.PaymentId,
|
||||
Inputs: e.PaymentInputs,
|
||||
}
|
||||
case OutputsRegistered:
|
||||
r.Payments[e.PaymentId] = Payment{
|
||||
Id: e.PaymentId,
|
||||
Inputs: r.Payments[e.PaymentId].Inputs,
|
||||
Receivers: e.PaymentOutputs,
|
||||
}
|
||||
}
|
||||
|
||||
if replayed {
|
||||
r.Version++
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Round) StartRegistration() ([]RoundEvent, error) {
|
||||
empty := Stage{}
|
||||
if r.Stage != empty {
|
||||
return nil, fmt.Errorf("not in a valid stage to start payment registration")
|
||||
}
|
||||
|
||||
event := RoundStarted{
|
||||
Id: r.Id,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
r.raise(event)
|
||||
|
||||
return []RoundEvent{event}, nil
|
||||
}
|
||||
|
||||
func (r *Round) StartFinalization(connectors, txs, tree []string, poolTx string) ([]RoundEvent, error) {
|
||||
if r.Stage.Code != RegistrationStage || r.IsFailed() {
|
||||
return nil, fmt.Errorf("not in a valid stage to start payment finalization")
|
||||
}
|
||||
|
||||
event := RoundFinalizationStarted{
|
||||
Id: r.Id,
|
||||
ForfeitTxs: txs,
|
||||
CongestionTree: tree,
|
||||
Connectors: connectors,
|
||||
PoolTx: poolTx,
|
||||
}
|
||||
r.raise(event)
|
||||
|
||||
return []RoundEvent{event}, nil
|
||||
}
|
||||
|
||||
func (r *Round) EndFinalization(txid string) ([]RoundEvent, error) {
|
||||
if r.Stage.Code != FinalizationStage || r.IsFailed() {
|
||||
return nil, fmt.Errorf("not in a valid stage to end payment finalization")
|
||||
}
|
||||
if r.Stage.Ended {
|
||||
return nil, fmt.Errorf("payment finalization already ended")
|
||||
}
|
||||
event := RoundFinalized{
|
||||
Id: r.Id,
|
||||
Txid: txid,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
r.raise(event)
|
||||
|
||||
return []RoundEvent{event}, nil
|
||||
}
|
||||
|
||||
func (r *Round) Fail(err error) []RoundEvent {
|
||||
if r.Stage.Failed {
|
||||
return nil
|
||||
}
|
||||
event := RoundFailed{
|
||||
Id: r.Id,
|
||||
Err: err,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
r.raise(event)
|
||||
|
||||
return []RoundEvent{event}
|
||||
}
|
||||
|
||||
func (r *Round) RegisterInputs(id string, ins []Vtxo) ([]RoundEvent, error) {
|
||||
if r.Stage.Code != RegistrationStage || r.IsFailed() {
|
||||
return nil, fmt.Errorf("not in a valid stage to register inputs")
|
||||
}
|
||||
if r.Stage.Ended {
|
||||
return nil, fmt.Errorf("payment registration already ended")
|
||||
}
|
||||
|
||||
event := InputsRegistered{
|
||||
Id: r.Id,
|
||||
PaymentId: id,
|
||||
PaymentInputs: ins,
|
||||
}
|
||||
r.raise(event)
|
||||
|
||||
return []RoundEvent{event}, nil
|
||||
}
|
||||
|
||||
func (r *Round) RegisterOutputs(id string, outs []Receiver) ([]RoundEvent, error) {
|
||||
if r.Stage.Code != RegistrationStage || r.IsFailed() {
|
||||
return nil, fmt.Errorf("not in a valid stage to register inputs")
|
||||
}
|
||||
if r.Stage.Ended {
|
||||
return nil, fmt.Errorf("payment registration already ended")
|
||||
}
|
||||
|
||||
event := OutputsRegistered{
|
||||
Id: r.Id,
|
||||
PaymentId: id,
|
||||
PaymentOutputs: outs,
|
||||
}
|
||||
r.raise(event)
|
||||
|
||||
return []RoundEvent{event}, nil
|
||||
}
|
||||
|
||||
func (r *Round) IsStarted() bool {
|
||||
empty := Stage{}
|
||||
return !r.IsFailed() && (r.Stage != empty && !r.IsEnded())
|
||||
}
|
||||
|
||||
func (r *Round) IsEnded() bool {
|
||||
return !r.IsFailed() && (r.Stage.Code == FinalizationStage && r.Stage.Ended)
|
||||
}
|
||||
|
||||
func (r *Round) IsFailed() bool {
|
||||
return r.Stage.Failed
|
||||
}
|
||||
|
||||
func (r *Round) TotInputAmount() uint64 {
|
||||
return uint64(len(r.Payments) * dustAmount)
|
||||
}
|
||||
|
||||
func (r *Round) TotOutputAmount() uint64 {
|
||||
tot := uint64(0)
|
||||
for _, p := range r.Payments {
|
||||
tot += p.TotOutputAmount()
|
||||
}
|
||||
return tot
|
||||
}
|
||||
|
||||
func (r *Round) raise(event RoundEvent) {
|
||||
if r.Changes == nil {
|
||||
r.Changes = make([]RoundEvent, 0)
|
||||
}
|
||||
r.Changes = append(r.Changes, event)
|
||||
r.On(event, false)
|
||||
}
|
||||
18
internal/core/domain/round_repo.go
Normal file
18
internal/core/domain/round_repo.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package domain
|
||||
|
||||
import "context"
|
||||
|
||||
type RoundEventRepository interface {
|
||||
Save(ctx context.Context, events ...RoundEvent) error
|
||||
Load(ctx context.Context, id string) (*Round, error)
|
||||
}
|
||||
|
||||
type RoundRepository interface {
|
||||
AddRound(ctx context.Context, round *Round) error
|
||||
GetCurrentRound(ctx context.Context) (*Round, error)
|
||||
GetRoundWithId(ctx, id string) (*Round, error)
|
||||
GetRoundWithTxid(ctx, txid string) (*Round, error)
|
||||
UpdateRound(
|
||||
ctx context.Context, id string, updateFn func(r *Round) (*Round, error),
|
||||
) error
|
||||
}
|
||||
8
internal/core/ports/repo_manager.go
Normal file
8
internal/core/ports/repo_manager.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package ports
|
||||
|
||||
import "github.com/ark-network/ark/internal/core/domain"
|
||||
|
||||
type RepoManager interface {
|
||||
Events() domain.RoundEventRepository
|
||||
Rounds() domain.RoundRepository
|
||||
}
|
||||
8
internal/core/ports/scheduler.go
Normal file
8
internal/core/ports/scheduler.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package ports
|
||||
|
||||
type SchedulerService interface {
|
||||
Start()
|
||||
Stop()
|
||||
|
||||
ScheduleTask(interval int64, immediate bool, task func())
|
||||
}
|
||||
9
internal/core/ports/tx_builder.go
Normal file
9
internal/core/ports/tx_builder.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package ports
|
||||
|
||||
import "github.com/ark-network/ark/internal/core/domain"
|
||||
|
||||
type TxBuilder interface {
|
||||
BuildPoolTx(wallet WalletService, payments []domain.Payment) (poolTx string, err error)
|
||||
BuildCongestionTree(poolTx string, payments []domain.Payment) (congestionTree []string, err error)
|
||||
BuildForfeitTxs(poolTx string, payments []domain.Payment) (connectors []string, forfeitTxs []string, err error)
|
||||
}
|
||||
@@ -108,7 +108,6 @@ type TxInput interface {
|
||||
}
|
||||
|
||||
type TxOutput interface {
|
||||
GetAsset() string
|
||||
GetAmount() uint64
|
||||
GetScript() string
|
||||
}
|
||||
|
||||
@@ -118,7 +118,6 @@ func (l outputList) toProto() []*pb.Output {
|
||||
list := make([]*pb.Output, 0, len(l))
|
||||
for _, out := range l {
|
||||
list = append(list, &pb.Output{
|
||||
Asset: out.GetAsset(),
|
||||
Amount: out.GetAmount(),
|
||||
Script: out.GetScript(),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user