mirror of
https://github.com/aljazceru/ark.git
synced 2025-12-18 12:44:19 +01:00
Detect redemptions and mark vtxos has redeemed (#87)
* Update ocean protos & Add blockchain scanner * Detect redemptions and mark vtxos as redeemed * Update comment * Fixes * Fix watched event type * Fixes * Fixes * Restore watching vtxos at startup * Update deps
This commit is contained in:
committed by
GitHub
parent
b2e034cf0e
commit
940214f699
@@ -47,18 +47,20 @@ type service struct {
|
||||
onchainNework network.Network
|
||||
pubkey *secp256k1.PublicKey
|
||||
|
||||
wallet ports.WalletService
|
||||
repoManager ports.RepoManager
|
||||
builder ports.TxBuilder
|
||||
wallet ports.WalletService
|
||||
repoManager ports.RepoManager
|
||||
builder ports.TxBuilder
|
||||
scanner ports.BlockchainScanner
|
||||
|
||||
paymentRequests *paymentsMap
|
||||
forfeitTxs *forfeitTxsMap
|
||||
|
||||
eventsCh chan domain.RoundEvent
|
||||
eventsCh chan domain.RoundEvent
|
||||
}
|
||||
|
||||
func NewService(
|
||||
interval int64, network common.Network, onchainNetwork network.Network,
|
||||
walletSvc ports.WalletService, repoManager ports.RepoManager, builder ports.TxBuilder,
|
||||
walletSvc ports.WalletService, repoManager ports.RepoManager,
|
||||
builder ports.TxBuilder, scanner ports.BlockchainScanner,
|
||||
minRelayFee uint64,
|
||||
) (Service, error) {
|
||||
eventsCh := make(chan domain.RoundEvent)
|
||||
@@ -72,7 +74,7 @@ func NewService(
|
||||
}
|
||||
svc := &service{
|
||||
minRelayFee, interval, network, onchainNetwork, pubkey,
|
||||
walletSvc, repoManager, builder, paymentRequests, forfeitTxs,
|
||||
walletSvc, repoManager, builder, scanner, paymentRequests, forfeitTxs,
|
||||
eventsCh,
|
||||
}
|
||||
repoManager.RegisterEventsHandler(
|
||||
@@ -81,15 +83,29 @@ func NewService(
|
||||
svc.propagateEvents(round)
|
||||
},
|
||||
)
|
||||
|
||||
if err := svc.restoreWatchingVtxos(); err != nil {
|
||||
return nil, fmt.Errorf("failed to restore watching vtxos: %s", err)
|
||||
}
|
||||
go svc.listenToRedemptions()
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
func (s *service) Start() error {
|
||||
log.Debug("starting app service")
|
||||
return s.start()
|
||||
go s.start()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Stop() {
|
||||
// nolint
|
||||
vtxos, _ := s.repoManager.Vtxos().GetSpendableVtxos(
|
||||
context.Background(), "",
|
||||
)
|
||||
if len(vtxos) > 0 {
|
||||
s.stopWatchingVtxos(vtxos)
|
||||
}
|
||||
|
||||
s.wallet.Close()
|
||||
log.Debug("closed connection to wallet")
|
||||
s.repoManager.Close()
|
||||
@@ -177,7 +193,7 @@ func (s *service) SignVtxos(ctx context.Context, forfeitTxs []string) error {
|
||||
|
||||
func (s *service) ListVtxos(ctx context.Context, pubkey *secp256k1.PublicKey) ([]domain.Vtxo, error) {
|
||||
pk := hex.EncodeToString(pubkey.SerializeCompressed())
|
||||
return s.repoManager.Vtxos().GetSpendableVtxosWithPubkey(ctx, pk)
|
||||
return s.repoManager.Vtxos().GetSpendableVtxos(ctx, pk)
|
||||
}
|
||||
|
||||
func (s *service) GetEventsChannel(ctx context.Context) <-chan domain.RoundEvent {
|
||||
@@ -196,9 +212,8 @@ func (s *service) GetPubkey(ctx context.Context) (string, error) {
|
||||
return pubkey, nil
|
||||
}
|
||||
|
||||
func (s *service) start() error {
|
||||
func (s *service) start() {
|
||||
s.startRound()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) startRound() {
|
||||
@@ -345,6 +360,28 @@ func (s *service) finalizeRound() {
|
||||
log.Debugf("finalized round %s with pool tx %s", round.Id, round.Txid)
|
||||
}
|
||||
|
||||
func (s *service) listenToRedemptions() {
|
||||
ctx := context.Background()
|
||||
chVtxos := s.scanner.GetNotificationChannel(ctx)
|
||||
for vtxoKeys := range chVtxos {
|
||||
if len(vtxoKeys) > 0 {
|
||||
for {
|
||||
// TODO: make sure that the vtxos haven't been already spent, otherwise
|
||||
// broadcast the corresponding forfeit tx and connector to prevent
|
||||
// getting cheated.
|
||||
vtxos, err := s.repoManager.Vtxos().RedeemVtxos(ctx, vtxoKeys)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("failed to redeem vtxos, retrying...")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
log.Debugf("redeemed %d vtxos", len(vtxos))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) updateProjectionStore(round *domain.Round) {
|
||||
ctx := context.Background()
|
||||
lastChange := round.Events()[len(round.Events())-1]
|
||||
@@ -374,6 +411,19 @@ func (s *service) updateProjectionStore(round *domain.Round) {
|
||||
log.Debugf("added %d new vtxos", len(newVtxos))
|
||||
break
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if err := s.startWatchingVtxos(newVtxos); err != nil {
|
||||
log.WithError(err).Warn(
|
||||
"failed to start watching vtxos, retrying in a moment...",
|
||||
)
|
||||
continue
|
||||
}
|
||||
log.Debugf("started watching %d vtxos", len(newVtxos))
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Always update the status of the round.
|
||||
@@ -419,7 +469,7 @@ func (s *service) getNewVtxos(round *domain.Round) []domain.Vtxo {
|
||||
|
||||
buf, _ := hex.DecodeString(r.Pubkey)
|
||||
pk, _ := secp256k1.ParsePubKey(buf)
|
||||
script, _ := s.builder.GetVtxoOutputScript(pk, s.pubkey)
|
||||
script, _ := s.builder.GetVtxoScript(pk, s.pubkey)
|
||||
if bytes.Equal(script, out.Script) {
|
||||
found = true
|
||||
pubkey = r.Pubkey
|
||||
@@ -440,6 +490,77 @@ func (s *service) getNewVtxos(round *domain.Round) []domain.Vtxo {
|
||||
return vtxos
|
||||
}
|
||||
|
||||
func (s *service) startWatchingVtxos(vtxos []domain.Vtxo) error {
|
||||
scripts, err := s.extractVtxosScripts(vtxos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.scanner.WatchScripts(context.Background(), scripts)
|
||||
}
|
||||
|
||||
func (s *service) stopWatchingVtxos(vtxos []domain.Vtxo) {
|
||||
scripts, err := s.extractVtxosScripts(vtxos)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("failed to extract scripts from vtxos")
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if err := s.scanner.UnwatchScripts(context.Background(), scripts); err != nil {
|
||||
log.WithError(err).Warn("failed to stop watching vtxos, retrying in a moment...")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
log.Debugf("stopped watching %d vtxos", len(vtxos))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) restoreWatchingVtxos() error {
|
||||
vtxos, err := s.repoManager.Vtxos().GetSpendableVtxos(
|
||||
context.Background(), "",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(vtxos) <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.startWatchingVtxos(vtxos); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("restored watching %d vtxos", len(vtxos))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) extractVtxosScripts(vtxos []domain.Vtxo) ([]string, error) {
|
||||
indexedScripts := make(map[string]struct{})
|
||||
for _, vtxo := range vtxos {
|
||||
buf, err := hex.DecodeString(vtxo.Pubkey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
userPubkey, err := secp256k1.ParsePubKey(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
script, err := s.builder.GetVtxoScript(userPubkey, s.pubkey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
indexedScripts[hex.EncodeToString(script)] = struct{}{}
|
||||
}
|
||||
scripts := make([]string, 0, len(indexedScripts))
|
||||
for script := range indexedScripts {
|
||||
scripts = append(scripts, script)
|
||||
}
|
||||
return scripts, nil
|
||||
}
|
||||
|
||||
func getSpentVtxos(payments map[string]domain.Payment) []domain.VtxoKey {
|
||||
vtxos := make([]domain.VtxoKey, 0)
|
||||
for _, p := range payments {
|
||||
|
||||
Reference in New Issue
Block a user