Move btc wallet syncing in background (#352)

* Move btc wallet syncing in bg

* Fix

* Fix e2e test

* Use neutrino in regtest for e2e

* Revert

* Fix
This commit is contained in:
Pietralberto Mazza
2024-10-17 16:09:19 +02:00
committed by GitHub
parent ea5fa22023
commit 03670c9c4b
9 changed files with 258 additions and 80 deletions

View File

@@ -12,9 +12,7 @@ services:
- ARK_ROUND_LIFETIME=20
- ARK_TX_BUILDER_TYPE=covenantless
- ARK_ESPLORA_URL=http://chopsticks:3000
- ARK_BITCOIND_RPC_USER=admin1
- ARK_BITCOIND_RPC_PASS=123
- ARK_BITCOIND_RPC_HOST=bitcoin:18443
- ARK_NEUTRINO_PEER=bitcoin:18444
- ARK_SCHEDULER_TYPE=block
- ARK_NO_TLS=true
- ARK_NO_MACAROONS=true

View File

@@ -13,6 +13,7 @@ var ErrNonFinalBIP68 = errors.New("non-final BIP68 sequence")
type WalletService interface {
BlockchainScanner
GetSyncedUpdate(ctx context.Context) <-chan struct{}
GenSeed(ctx context.Context) (string, error)
Create(ctx context.Context, seed, password string) error
Restore(ctx context.Context, seed, password string) error

View File

@@ -13,6 +13,16 @@ type mockedWallet struct {
mock.Mock
}
func (m *mockedWallet) GetSyncedUpdate(ctx context.Context) <-chan struct{} {
args := m.Called(ctx)
var res chan struct{}
if a := args.Get(0); a != nil {
res = a.(chan struct{})
}
return res
}
func (m *mockedWallet) GenSeed(ctx context.Context) (string, error) {
args := m.Called(ctx)

View File

@@ -13,6 +13,16 @@ type mockedWallet struct {
mock.Mock
}
func (m *mockedWallet) GetSyncedUpdate(ctx context.Context) <-chan struct{} {
args := m.Called(ctx)
var res chan struct{}
if a := args.Get(0); a != nil {
res = a.(chan struct{})
}
return res
}
func (m *mockedWallet) GenSeed(ctx context.Context) (string, error) {
args := m.Called(ctx)

View File

@@ -83,10 +83,14 @@ const (
)
var (
ErrWalletNotLoaded = fmt.Errorf("wallet not loaded, create or unlock it first")
p2wpkhKeyScope = waddrmgr.KeyScopeBIP0084
p2trKeyScope = waddrmgr.KeyScopeBIP0086
outputLockDuration = time.Minute
ErrNotLoaded = fmt.Errorf("wallet not loaded, create or unlock it first")
ErrNotSynced = fmt.Errorf("wallet still syncing, please retry later")
ErrNotReady = fmt.Errorf("wallet not ready, please init and wait for it to complete syncing")
ErrNotUnlocked = fmt.Errorf("wallet is locked, please unlock it to perform this operation")
ErrAlreadyInitialized = fmt.Errorf("wallet already initialized")
p2wpkhKeyScope = waddrmgr.KeyScopeBIP0084
p2trKeyScope = waddrmgr.KeyScopeBIP0086
outputLockDuration = time.Minute
)
// add additional chain API not supported by the chain.Interface type
@@ -110,6 +114,9 @@ type service struct {
// holds the data related to the ASP key used in Vtxo scripts
aspKeyAddr waddrmgr.ManagedPubKeyAddress
isSynced bool
syncedCh chan struct{}
}
// WithNeutrino creates a start a neutrino node using the provided service datadir
@@ -144,15 +151,6 @@ func WithNeutrino(initialPeer string, esploraURL string) WalletOption {
return err
}
if err := neutrinoSvc.Start(); err != nil {
return err
}
// wait for neutrino to sync
for !neutrinoSvc.IsCurrent() {
time.Sleep(1 * time.Second)
}
chainSrc := chain.NewNeutrinoClient(netParams, neutrinoSvc)
scanner := chain.NewNeutrinoClient(netParams, neutrinoSvc)
@@ -268,6 +266,7 @@ func NewService(cfg WalletConfig, options ...WalletOption) (ports.WalletService,
cfg: cfg,
watchedScriptsLock: sync.RWMutex{},
watchedScripts: make(map[string]struct{}),
syncedCh: make(chan struct{}),
}
for _, option := range options {
@@ -280,11 +279,14 @@ func NewService(cfg WalletConfig, options ...WalletOption) (ports.WalletService,
}
func (s *service) Close() {
if s.walletLoaded() {
if err := s.wallet.Stop(); err != nil {
log.WithError(err).Warn("failed to gracefully stop the wallet, forcing shutdown")
}
if s.isLoaded() {
s.wallet.InternalWallet().Stop()
}
s.chainSource.Stop()
}
func (s *service) GetSyncedUpdate(_ context.Context) <-chan struct{} {
return s.syncedCh
}
func (s *service) GenSeed(_ context.Context) (string, error) {
@@ -304,11 +306,11 @@ func (s *service) Restore(_ context.Context, seed, password string) error {
}
func (s *service) Unlock(_ context.Context, password string) error {
if !s.walletInitialized() {
if !s.isInitialized() {
return fmt.Errorf("wallet not initialized")
}
if !s.walletLoaded() {
if !s.isLoaded() {
pwd := []byte(password)
opt := btcwallet.LoaderWithLocalWalletDB(s.cfg.Datadir, false, time.Minute)
config := btcwallet.Config{
@@ -332,16 +334,6 @@ func (s *service) Unlock(_ context.Context, password string) error {
return fmt.Errorf("failed to start wallet: %s", err)
}
for {
if !wallet.InternalWallet().ChainSynced() {
log.Debugf("waiting sync: current height %d", wallet.InternalWallet().Manager.SyncedTo().Height)
time.Sleep(3 * time.Second)
continue
}
break
}
log.Debugf("chain synced")
addrs, err := wallet.ListAddresses(string(aspKeyAccount), false)
if err != nil {
return err
@@ -381,14 +373,18 @@ func (s *service) Unlock(_ context.Context, password string) error {
}
s.wallet = wallet
go s.listenToSynced()
return nil
}
return s.wallet.InternalWallet().Unlock([]byte(password), nil)
}
func (s *service) Lock(_ context.Context, _ string) error {
if !s.walletLoaded() {
return ErrWalletNotLoaded
if !s.isLoaded() {
return ErrNotLoaded
}
s.wallet.InternalWallet().Lock()
@@ -404,14 +400,15 @@ func (s *service) BroadcastTransaction(ctx context.Context, txHex string) (strin
if err := tx.Deserialize(hex.NewDecoder(strings.NewReader(txHex))); err != nil {
return "", err
}
if err := s.wallet.PublishTransaction(&tx, ""); err != nil {
return "", err
}
return tx.TxHash().String(), nil
}
func (s *service) ConnectorsAccountBalance(ctx context.Context) (uint64, uint64, error) {
if err := s.safeCheck(); err != nil {
return 0, 0, err
}
utxos, err := s.listUtxos(p2trKeyScope)
if err != nil {
return 0, 0, err
@@ -426,6 +423,10 @@ func (s *service) ConnectorsAccountBalance(ctx context.Context) (uint64, uint64,
}
func (s *service) MainAccountBalance(ctx context.Context) (uint64, uint64, error) {
if err := s.safeCheck(); err != nil {
return 0, 0, err
}
utxos, err := s.listUtxos(p2wpkhKeyScope)
if err != nil {
return 0, 0, err
@@ -440,8 +441,11 @@ func (s *service) MainAccountBalance(ctx context.Context) (uint64, uint64, error
}
func (s *service) DeriveAddresses(ctx context.Context, num int) ([]string, error) {
addresses := make([]string, 0, num)
if err := s.safeCheck(); err != nil {
return nil, err
}
addresses := make([]string, 0, num)
for i := 0; i < num; i++ {
addr, err := s.deriveNextAddress()
if err != nil {
@@ -459,6 +463,10 @@ func (s *service) DeriveAddresses(ctx context.Context, num int) ([]string, error
}
func (s *service) DeriveConnectorAddress(ctx context.Context) (string, error) {
if err := s.safeCheck(); err != nil {
return "", err
}
addr, err := s.wallet.NewAddress(lnwallet.TaprootPubkey, false, string(connectorAccount))
if err != nil {
return "", err
@@ -468,10 +476,17 @@ func (s *service) DeriveConnectorAddress(ctx context.Context) (string, error) {
}
func (s *service) GetPubkey(ctx context.Context) (*secp256k1.PublicKey, error) {
if !s.isLoaded() {
return nil, ErrNotLoaded
}
return s.aspKeyAddr.PubKey(), nil
}
func (s *service) GetForfeitAddress(ctx context.Context) (string, error) {
if err := s.safeCheck(); err != nil {
return "", err
}
addrs, err := s.wallet.ListAddresses(string(mainAccount), false)
if err != nil {
return "", err
@@ -512,6 +527,10 @@ func (s *service) GetForfeitAddress(ctx context.Context) (string, error) {
}
func (s *service) ListConnectorUtxos(ctx context.Context, connectorAddress string) ([]ports.TxInput, error) {
if err := s.safeCheck(); err != nil {
return nil, err
}
w := s.wallet.InternalWallet()
addr, err := btcutil.DecodeAddress(connectorAddress, w.ChainParams())
@@ -542,6 +561,10 @@ func (s *service) ListConnectorUtxos(ctx context.Context, connectorAddress strin
}
func (s *service) LockConnectorUtxos(ctx context.Context, utxos []ports.TxOutpoint) error {
if err := s.safeCheck(); err != nil {
return err
}
w := s.wallet.InternalWallet()
for _, utxo := range utxos {
@@ -562,6 +585,10 @@ func (s *service) LockConnectorUtxos(ctx context.Context, utxos []ports.TxOutpoi
}
func (s *service) SelectUtxos(ctx context.Context, _ string, amount uint64) ([]ports.TxInput, uint64, error) {
if err := s.safeCheck(); err != nil {
return nil, 0, err
}
w := s.wallet.InternalWallet()
utxos, err := s.listUtxos(p2wpkhKeyScope)
@@ -616,6 +643,10 @@ func (s *service) SelectUtxos(ctx context.Context, _ string, amount uint64) ([]p
}
func (s *service) SignTransaction(ctx context.Context, partialTx string, extractRawTx bool) (string, error) {
if err := s.safeCheck(); err != nil {
return "", err
}
ptx, err := psbt.NewFromRawBytes(
strings.NewReader(partialTx),
true,
@@ -701,6 +732,10 @@ func (s *service) SignTransaction(ctx context.Context, partialTx string, extract
}
func (s *service) SignTransactionTapscript(ctx context.Context, partialTx string, inputIndexes []int) (string, error) {
if err := s.safeCheck(); err != nil {
return "", err
}
partial, err := psbt.NewFromRawBytes(
strings.NewReader(partialTx),
true,
@@ -739,9 +774,9 @@ func (s *service) SignTransactionTapscript(ctx context.Context, partialTx string
}
func (s *service) Status(ctx context.Context) (ports.WalletStatus, error) {
if !s.walletLoaded() {
if !s.isLoaded() {
return status{
initialized: s.walletInitialized(),
initialized: s.isInitialized(),
}, nil
}
@@ -754,6 +789,10 @@ func (s *service) Status(ctx context.Context) (ports.WalletStatus, error) {
}
func (s *service) WaitForSync(ctx context.Context, txid string) error {
if err := s.safeCheck(); err != nil {
return err
}
w := s.wallet.InternalWallet()
txhash, err := chainhash.NewHashFromStr(txid)
@@ -869,6 +908,10 @@ func (s *service) EstimateFees(ctx context.Context, partialTx string) (uint64, e
}
func (s *service) WatchScripts(ctx context.Context, scripts []string) error {
if !s.isSynced {
return ErrNotSynced
}
addresses := make([]btcutil.Address, 0, len(scripts))
for _, script := range scripts {
@@ -904,6 +947,10 @@ func (s *service) WatchScripts(ctx context.Context, scripts []string) error {
}
func (s *service) UnwatchScripts(ctx context.Context, scripts []string) error {
if !s.isSynced {
return ErrNotSynced
}
s.watchedScriptsLock.Lock()
defer s.watchedScriptsLock.Unlock()
for _, script := range scripts {
@@ -1014,6 +1061,10 @@ func (s *service) castNotification(tx *wtxmgr.TxRecord) map[string][]ports.VtxoW
}
func (s *service) create(mnemonic, password string, addrGap uint32) error {
if s.isInitialized() {
return ErrAlreadyInitialized
}
if len(mnemonic) <= 0 {
return fmt.Errorf("missing hd seed")
}
@@ -1057,24 +1108,62 @@ func (s *service) create(mnemonic, password string, addrGap uint32) error {
return fmt.Errorf("failed to start wallet: %s", err)
}
for {
if !wallet.InternalWallet().ChainSynced() {
log.Debugf("waiting sync: current height %d", wallet.InternalWallet().Manager.SyncedTo().Height)
time.Sleep(3 * time.Second)
continue
}
break
}
log.Debugf("chain synced")
if err := s.initAspKeyAddress(wallet); err != nil {
return err
}
s.wallet = wallet
go s.listenToSynced()
return nil
}
func (s *service) listenToSynced() {
counter := 0
for {
if s.wallet.InternalWallet().ChainSynced() {
log.Debug("wallet: syncing completed")
s.isSynced = true
s.syncedCh <- struct{}{}
return
}
isRestore, progress, err := s.wallet.GetRecoveryInfo()
if err != nil {
log.Warnf("wallet: failed to check if wallet is synced: %s", err)
} else {
if !isRestore {
if counter%6 == 0 {
log.Debug("wallet: syncing in progress...")
}
counter++
} else {
switch progress {
case 0:
// nolint: all
if counter%6 == 0 {
_, bestBlock, _ := s.wallet.IsSynced()
if bestBlock > 0 {
log.Debugf("wallet: waiting for chain source to be synced, last block fetched: %s", time.Unix(bestBlock, 0))
}
}
counter++
case 1:
log.Debug("wallet: syncing completed")
s.isSynced = true
s.syncedCh <- struct{}{}
return
default:
log.Debugf("wallet: syncing progress %.0f%%", progress*100)
}
}
}
time.Sleep(10 * time.Second)
}
}
// initAspKeyAccount creates the asp key account if it doesn't exist
func (s *service) initAspKeyAccount(wallet *btcwallet.BtcWallet) error {
w := wallet.InternalWallet()
@@ -1173,18 +1262,31 @@ func (s *service) initAspKeyAddress(wallet *btcwallet.BtcWallet) error {
}
func (s *service) deriveNextAddress() (btcutil.Address, error) {
if !s.walletLoaded() {
return nil, ErrWalletNotLoaded
if !s.isLoaded() {
return nil, ErrNotLoaded
}
return s.wallet.NewAddress(lnwallet.WitnessPubKey, false, string(mainAccount))
}
func (s *service) walletLoaded() bool {
func (s *service) safeCheck() error {
if !s.isLoaded() {
if s.isInitialized() {
return ErrNotUnlocked
}
return ErrNotReady
}
if !s.isSynced {
return ErrNotSynced
}
return nil
}
func (s *service) isLoaded() bool {
return s.wallet != nil
}
func (s *service) walletInitialized() bool {
func (s *service) isInitialized() bool {
opts := []btcwallet.LoaderOption{btcwallet.LoaderWithLocalWalletDB(s.cfg.Datadir, false, time.Minute)}
loader, err := btcwallet.NewWalletLoader(
s.cfg.chainParams(), 0, opts...,
@@ -1241,10 +1343,6 @@ func withChainSource(chainSource chain.Interface) WalletOption {
return fmt.Errorf("chain source already set")
}
if err := chainSource.Start(); err != nil {
return fmt.Errorf("failed to start chain source: %s", err)
}
s.chainSource = chainSource
return nil
}

View File

@@ -24,6 +24,7 @@ type service struct {
notifyClient pb.NotificationServiceClient
chVtxos chan map[string][]ports.VtxoWithValue
isListening bool
syncedCh chan struct{}
}
func NewService(addr string) (ports.WalletService, error) {
@@ -44,6 +45,7 @@ func NewService(addr string) (ports.WalletService, error) {
txClient: txClient,
notifyClient: notifyClient,
chVtxos: chVtxos,
syncedCh: make(chan struct{}),
}
ctx := context.Background()
@@ -65,6 +67,10 @@ func (s *service) Close() {
s.conn.Close()
}
func (s *service) GetSyncedUpdate(_ context.Context) <-chan struct{} {
return s.syncedCh
}
func (s *service) GenSeed(ctx context.Context) (string, error) {
res, err := s.walletClient.GenSeed(ctx, &pb.GenSeedRequest{})
if err != nil {

View File

@@ -6,18 +6,22 @@ import (
arkv1 "github.com/ark-network/ark/api-spec/protobuf/gen/ark/v1"
"github.com/ark-network/ark/server/internal/core/ports"
log "github.com/sirupsen/logrus"
)
type walletInitHandler struct {
walletService ports.WalletService
onInit func(password string)
onUnlock func(password string)
onReady func()
}
func NewWalletInitializerHandler(
walletService ports.WalletService, onInit, onUnlock func(string),
walletService ports.WalletService, onInit, onUnlock func(string), onReady func(),
) arkv1.WalletInitializerServiceServer {
return &walletInitHandler{walletService, onInit, onUnlock}
svc := walletInitHandler{walletService, onInit, onUnlock, onReady}
go svc.listenWhenReady()
return &svc
}
func (a *walletInitHandler) GenSeed(ctx context.Context, _ *arkv1.GenSeedRequest) (*arkv1.GenSeedResponse, error) {
@@ -77,6 +81,17 @@ func (a *walletInitHandler) Unlock(ctx context.Context, req *arkv1.UnlockRequest
go a.onUnlock(req.GetPassword())
go func() {
status, err := a.walletService.Status(context.Background())
if err != nil {
log.Warnf("failed to get wallet status: %s", err)
return
}
if status.IsUnlocked() && status.IsSynced() {
a.onReady()
}
}()
return &arkv1.UnlockResponse{}, nil
}
@@ -93,6 +108,19 @@ func (a *walletInitHandler) GetStatus(ctx context.Context, _ *arkv1.GetStatusReq
}, nil
}
func (a *walletInitHandler) listenWhenReady() {
ctx := context.Background()
<-a.walletService.GetSyncedUpdate(ctx)
status, err := a.walletService.Status(ctx)
if err != nil {
log.Warnf("failed to get wallet status: %s", err)
}
if status.IsUnlocked() && status.IsSynced() {
a.onReady()
}
}
type walletHandler struct {
walletService ports.WalletService
}

View File

@@ -183,7 +183,7 @@ func (s *service) newServer(tlsConfig *tls.Config, withAppSvc bool) error {
arkv1.RegisterWalletServiceServer(grpcServer, walletHandler)
walletInitHandler := handlers.NewWalletInitializerHandler(
s.appConfig.WalletService(), s.onInit, s.onUnlock,
s.appConfig.WalletService(), s.onInit, s.onUnlock, s.onReady,
)
arkv1.RegisterWalletInitializerServiceServer(grpcServer, walletInitHandler)
@@ -271,14 +271,6 @@ func (s *service) newServer(tlsConfig *tls.Config, withAppSvc bool) error {
}
func (s *service) onUnlock(password string) {
withoutAppSvc := false
s.stop(withoutAppSvc)
withAppSvc := true
if err := s.start(withAppSvc); err != nil {
panic(err)
}
if s.config.NoMacaroons {
return
}
@@ -320,6 +312,16 @@ func (s *service) onInit(password string) {
log.Debugf("generated macaroons at path %s", datadir)
}
func (s *service) onReady() {
withoutAppSvc := false
s.stop(withoutAppSvc)
withAppSvc := true
if err := s.start(withAppSvc); err != nil {
panic(err)
}
}
func (s *service) autoUnlock() error {
ctx := context.Background()
wallet := s.appConfig.WalletService()

View File

@@ -445,25 +445,50 @@ func setupAspWallet() error {
return fmt.Errorf("failed to unlock wallet: %s", err)
}
time.Sleep(time.Second)
req, err = http.NewRequest("GET", "http://localhost:7070/v1/admin/wallet/address", nil)
if err != nil {
return fmt.Errorf("failed to prepare new address request: %s", err)
var status struct {
Initialized bool `json:"initialized"`
Unlocked bool `json:"unlocked"`
Synced bool `json:"synced"`
}
req.Header.Set("Authorization", "Basic YWRtaW46YWRtaW4=")
for {
time.Sleep(time.Second)
resp, err := adminHttpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to get new address: %s", err)
req, err := http.NewRequest("GET", "http://localhost:7070/v1/admin/wallet/status", nil)
if err != nil {
return fmt.Errorf("failed to prepare status request: %s", err)
}
resp, err := adminHttpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to get status: %s", err)
}
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return fmt.Errorf("failed to parse status response: %s", err)
}
if status.Initialized && status.Unlocked && status.Synced {
break
}
}
var addr struct {
Address string `json:"address"`
}
for addr.Address == "" {
time.Sleep(time.Second)
if err := json.NewDecoder(resp.Body).Decode(&addr); err != nil {
return fmt.Errorf("failed to parse response: %s", err)
req, err = http.NewRequest("GET", "http://localhost:7070/v1/admin/wallet/address", nil)
if err != nil {
return fmt.Errorf("failed to prepare new address request: %s", err)
}
req.Header.Set("Authorization", "Basic YWRtaW46YWRtaW4=")
resp, err := adminHttpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to get new address: %s", err)
}
if err := json.NewDecoder(resp.Body).Decode(&addr); err != nil {
return fmt.Errorf("failed to parse response: %s", err)
}
}
const numberOfFaucet = 15 // must cover the liquidity needed for all tests