diff --git a/fundingmanager.go b/fundingmanager.go index 1aaf8c66..d09d2b12 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -105,7 +105,7 @@ var ( // * deadlines, etc. type reservationWithCtx struct { reservation *lnwallet.ChannelReservation - peerAddress *lnwire.NetAddress + peer lnpeer.Peer chanAmt btcutil.Amount @@ -152,7 +152,7 @@ func (r *reservationWithCtx) updateTimestamp() { // embedded within this message giving the funding manager full context w.r.t // the workflow. type initFundingMsg struct { - peerAddress *lnwire.NetAddress + peer lnpeer.Peer *openChanReq } @@ -160,40 +160,40 @@ type initFundingMsg struct { // the message. This allows the funding manager to queue a response directly to // the peer, progressing the funding workflow. type fundingOpenMsg struct { - msg *lnwire.OpenChannel - peerAddress *lnwire.NetAddress + msg *lnwire.OpenChannel + peer lnpeer.Peer } // fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingAcceptMsg struct { - msg *lnwire.AcceptChannel - peerAddress *lnwire.NetAddress + msg *lnwire.AcceptChannel + peer lnpeer.Peer } // fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingCreatedMsg struct { - msg *lnwire.FundingCreated - peerAddress *lnwire.NetAddress + msg *lnwire.FundingCreated + peer lnpeer.Peer } // fundingSignedMsg couples an lnwire.FundingSigned message with the peer who // sent the message. This allows the funding manager to queue a response // directly to the peer, progressing the funding workflow. type fundingSignedMsg struct { - msg *lnwire.FundingSigned - peerAddress *lnwire.NetAddress + msg *lnwire.FundingSigned + peer lnpeer.Peer } // fundingLockedMsg couples an lnwire.FundingLocked message with the peer who // sent the message. This allows the funding manager to finalize the funding // process and announce the existence of the new channel. type fundingLockedMsg struct { - msg *lnwire.FundingLocked - peerAddress *lnwire.NetAddress + msg *lnwire.FundingLocked + peer lnpeer.Peer } // fundingErrorMsg couples an lnwire.Error message with the peer who sent the @@ -270,10 +270,12 @@ type fundingConfig struct { SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error // NotifyWhenOnline allows the FundingManager to register with a - // subsystem that will notify it when the peer comes online. - // This is used when sending the fundingLocked message, since it MUST be + // subsystem that will notify it when the peer comes online. This is + // used when sending the fundingLocked message, since it MUST be // delivered after the funding transaction is confirmed. - NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{}) + // + // NOTE: The peerChan channel must be buffered. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) // FindPeer searches the list of peers connected to the node so that // the FundingManager can notify other daemon subsystems as necessary @@ -827,13 +829,13 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) { // // TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding // transaction, then all reservations should be cleared. -func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, - tempChanID [32]byte, fundingErr error) { +func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte, + fundingErr error) { fndgLog.Debugf("Failing funding flow for pendingID=%x: %v", tempChanID, fundingErr) - ctx, err := f.cancelReservationCtx(peer, tempChanID) + ctx, err := f.cancelReservationCtx(peer.IdentityKey(), tempChanID) if err != nil { fndgLog.Errorf("unable to cancel reservation: %v", err) } @@ -868,8 +870,8 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, } fndgLog.Debugf("Sending funding error to peer (%x): %v", - peer.SerializeCompressed(), spew.Sdump(errMsg)) - if err := f.cfg.SendToPeer(peer, errMsg); err != nil { + peer.IdentityKey().SerializeCompressed(), spew.Sdump(errMsg)) + if err := peer.SendMessage(false, errMsg); err != nil { fndgLog.Errorf("unable to send error message to peer %v", err) } } @@ -950,10 +952,10 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { // processFundingOpen sends a message to the fundingManager allowing it to // initiate the new funding workflow with the source peer. func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingOpenMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingOpenMsg{msg, peer}: case <-f.quit: return } @@ -969,7 +971,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // Check number of pending channels to be smaller than maximum allowed // number and send ErrorGeneric to remote peer if condition is // violated. - peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey) + peerIDKey := newSerializedKey(fmsg.peer.IdentityKey()) msg := fmsg.msg amt := msg.FundingAmount @@ -980,8 +982,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels { f.resMtx.RUnlock() f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, - lnwire.ErrMaxPendingChannels) + fmsg.peer, fmsg.msg.PendingChannelID, + lnwire.ErrMaxPendingChannels, + ) return } f.resMtx.RUnlock() @@ -995,8 +998,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Errorf("unable to query wallet: %v", err) } f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, - lnwire.ErrSynchronizingChain) + fmsg.peer, fmsg.msg.PendingChannelID, + lnwire.ErrSynchronizingChain, + ) return } @@ -1004,7 +1008,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // current soft-limit for channel size. if msg.FundingAmount > maxFundingAmount { f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, + fmsg.peer, fmsg.msg.PendingChannelID, lnwire.ErrChanTooLarge, ) return @@ -1014,7 +1018,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // a channel that's below our current min channel size. if amt < f.cfg.MinChanSize { f.failFundingFlow( - fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID, + fmsg.peer, fmsg.msg.PendingChannelID, lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)), ) return @@ -1023,7 +1027,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ "pendingId=%x) from peer(%x)", amt, msg.PushAmount, msg.CsvDelay, msg.PendingChannelID, - fmsg.peerAddress.IdentityKey.SerializeCompressed()) + fmsg.peer.IdentityKey().SerializeCompressed()) // Attempt to initialize a reservation within the wallet. If the wallet // has insufficient resources to create the channel, then the @@ -1034,13 +1038,12 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { reservation, err := f.cfg.Wallet.InitChannelReservation( amt, 0, msg.PushAmount, lnwallet.SatPerKWeight(msg.FeePerKiloWeight), 0, - fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address, + fmsg.peer.IdentityKey(), fmsg.peer.Address(), &chainHash, msg.ChannelFlags, ) if err != nil { fndgLog.Errorf("Unable to initialize reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1060,9 +1063,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { ) if err != nil { fndgLog.Errorf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.PendingChannelID, err, - ) + f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) return } @@ -1090,7 +1091,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { remoteCsvDelay: remoteCsvDelay, remoteMinHtlc: minHtlc, err: make(chan error, 1), - peerAddress: fmsg.peerAddress, + peer: fmsg.peer, } f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() @@ -1132,8 +1133,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { err = reservation.ProcessSingleContribution(remoteContribution) if err != nil { fndgLog.Errorf("unable to add contribution reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1161,11 +1161,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { HtlcPoint: ourContribution.HtlcBasePoint.PubKey, FirstCommitmentPoint: ourContribution.FirstCommitmentPoint, } - err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, &fundingAccept) - if err != nil { + if err := fmsg.peer.SendMessage(false, &fundingAccept); err != nil { fndgLog.Errorf("unable to send funding response to peer: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } } @@ -1173,10 +1171,10 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // processFundingAccept sends a message to the fundingManager allowing it to // continue the second phase of a funding workflow with the target peer. func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingAcceptMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}: case <-f.quit: return } @@ -1188,7 +1186,7 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { msg := fmsg.msg pendingChanID := fmsg.msg.PendingChannelID - peerKey := fmsg.peerAddress.IdentityKey + peerKey := fmsg.peer.IdentityKey() resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { @@ -1212,8 +1210,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { ) if err != nil { fndgLog.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) return } @@ -1259,9 +1256,8 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { err = resCtx.reservation.ProcessContribution(remoteContribution) if err != nil { fndgLog.Errorf("Unable to process contribution from %v: %v", - fmsg.peerAddress.IdentityKey, err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + peerKey, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } @@ -1304,15 +1300,12 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { fundingCreated.CommitSig, err = lnwire.NewSigFromRawSignature(sig) if err != nil { fndgLog.Errorf("Unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } - err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated) - if err != nil { + if err := fmsg.peer.SendMessage(false, fundingCreated); err != nil { fndgLog.Errorf("Unable to send funding complete message: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - msg.PendingChannelID, err) + f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) return } } @@ -1320,10 +1313,10 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { // processFundingCreated queues a funding complete message coupled with the // source peer to the fundingManager. func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingCreatedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}: case <-f.quit: return } @@ -1334,7 +1327,7 @@ func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, // processed, a signature is sent to the remote peer allowing it to broadcast // the funding transaction, progressing the workflow into the final stage. func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { - peerKey := fmsg.peerAddress.IdentityKey + peerKey := fmsg.peer.IdentityKey() pendingChanID := fmsg.msg.PendingChannelID resCtx, err := f.getReservationCtx(peerKey, pendingChanID) @@ -1364,8 +1357,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { if err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. fndgLog.Errorf("unable to complete single reservation: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1405,8 +1397,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ourCommitSig, err := lnwire.NewSigFromRawSignature(sig) if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return } @@ -1415,10 +1406,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ChanID: channelID, CommitSig: ourCommitSig, } - if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil { + if err := fmsg.peer.SendMessage(false, fundingSigned); err != nil { fndgLog.Errorf("unable to send FundingSigned message: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return } @@ -1476,8 +1466,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { err := fmt.Errorf("timeout waiting for funding tx "+ "(%v) to confirm", completeChan.FundingOutpoint) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return case <-f.quit: @@ -1496,8 +1485,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Success, funding transaction was confirmed. f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID) - err := f.handleFundingConfirmation(completeChan, - shortChanID) + err := f.handleFundingConfirmation( + fmsg.peer, completeChan, shortChanID, + ) if err != nil { fndgLog.Errorf("failed to handle funding"+ "confirmation: %v", err) @@ -1509,10 +1499,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // processFundingSigned sends a single funding sign complete message along with // the source peer to the funding manager. func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingSignedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingSignedMsg{msg, peer}: case <-f.quit: return } @@ -1535,20 +1525,17 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { err := fmt.Errorf("Unable to find signed reservation for "+ "chan_id=%x", fmsg.msg.ChanID) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - fmsg.msg.ChanID, err) + f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err) return } - peerKey := fmsg.peerAddress.IdentityKey - resCtx, err := f.getReservationCtx(fmsg.peerAddress.IdentityKey, - pendingChanID) + peerKey := fmsg.peer.IdentityKey() + resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { fndgLog.Warnf("Unable to find reservation (peerID:%v, chanID:%x)", peerKey, pendingChanID[:]) // TODO: add ErrChanNotFound? - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1568,8 +1555,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig) if err != nil { fndgLog.Errorf("Unable to complete reservation sign complete: %v", err) - f.failFundingFlow(fmsg.peerAddress.IdentityKey, - pendingChanID, err) + f.failFundingFlow(fmsg.peer, pendingChanID, err) return } @@ -1583,7 +1569,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { } fndgLog.Infof("Finalizing pendingID(%x) over ChannelPoint(%v), "+ - "waiting for channel open on-chain", pendingChanID[:], fundingPoint) + "waiting for channel open on-chain", pendingChanID[:], + fundingPoint) // Send an update to the upstream client that the negotiation process // is over. @@ -1648,7 +1635,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { } defer lnChannel.Stop() - err = f.sendFundingLocked(completeChan, lnChannel, shortChanID) + err = f.sendFundingLocked( + fmsg.peer, completeChan, lnChannel, shortChanID, + ) if err != nil { fndgLog.Errorf("failed sending fundingLocked: %v", err) return @@ -1894,10 +1883,11 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open } // handleFundingConfirmation is a wrapper method for creating a new -// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph, -// and annAfterSixConfs. This is called after the funding transaction is -// confirmed. -func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel, +// lnwallet.LightningChannel object, calling sendFundingLocked, +// addToRouterGraph, and annAfterSixConfs. This is called after the funding +// transaction is confirmed. +func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, + completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) error { // We create the state-machine object which wraps the database state. @@ -1913,7 +1903,7 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID) - err = f.sendFundingLocked(completeChan, lnChannel, shortChanID) + err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID) if err != nil { return fmt.Errorf("failed sending fundingLocked: %v", err) } @@ -1933,11 +1923,12 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC // sendFundingLocked creates and sends the fundingLocked message. // This should be called after the funding transaction has been confirmed, // and the channelState is 'markedOpen'. -func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, - channel *lnwallet.LightningChannel, +func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, + completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error { chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) + peerKey := completeChan.IdentityPub // Next, we'll send over the funding locked message which marks that we // consider the channel open by presenting the remote party with our @@ -1962,29 +1953,26 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, // down. for { fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ - "peer %x", chanID, - completeChan.IdentityPub.SerializeCompressed()) + "peer %x", chanID, peerKey.SerializeCompressed()) - err = f.cfg.SendToPeer(completeChan.IdentityPub, - fundingLockedMsg) - if err == nil { - // Sending succeeded, we can break out and continue - // the funding flow. + if err := peer.SendMessage(false, fundingLockedMsg); err == nil { + // Sending succeeded, we can break out and continue the + // funding flow. break } - fndgLog.Warnf("unable to send fundingLocked to peer %x: "+ - "%v. Will retry when online", - completeChan.IdentityPub.SerializeCompressed(), err) + fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ + "Will retry when online", peerKey.SerializeCompressed(), + err) - connected := make(chan struct{}) + connected := make(chan lnpeer.Peer, 1) f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected) + select { case <-connected: fndgLog.Infof("Peer(%x) came back online, will retry "+ "sending FundingLocked for ChannelID(%v)", - completeChan.IdentityPub.SerializeCompressed(), - chanID) + peerKey.SerializeCompressed(), chanID) // Retry sending. case <-f.quit: @@ -2165,10 +2153,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, // processFundingLocked sends a message to the fundingManager allowing it to // finish the funding workflow. func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked, - peerAddress *lnwire.NetAddress) { + peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}: + case f.fundingMsgs <- &fundingLockedMsg{msg, peer}: case <-f.quit: return } @@ -2180,7 +2168,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { defer f.wg.Done() fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+ "peer %x", fmsg.msg.ChanID, - fmsg.peerAddress.IdentityKey.SerializeCompressed()) + fmsg.peer.IdentityKey().SerializeCompressed()) // If we are currently in the process of handling a funding locked // message for this channel, ignore. @@ -2276,32 +2264,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { f.barrierMtx.Unlock() }() - // Finally, we'll find the peer that sent us this message so we can - // provide it with the fully initialized channel state. - peer, err := f.cfg.FindPeer(fmsg.peerAddress.IdentityKey) - if err != nil { - fndgLog.Errorf("Unable to find peer: %v", err) + if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil { + fndgLog.Errorf("Unable to add new channel %v with peer %x: %v", + fmsg.peer.IdentityKey().SerializeCompressed(), + *channel.ChanPoint, err) channel.Stop() - return - } - newChanDone := make(chan struct{}) - newChanMsg := &newChannelMsg{ - channel: channel, - done: newChanDone, - } - - select { - case peer.newChannels <- newChanMsg: - case <-f.quit: - return - } - - // We pause here to wait for the peer to recognize the new channel - // before we close the channel barrier corresponding to the channel. - select { - case <-f.quit: - return - case <-newChanDone: // Fallthrough if we're not quitting. } } @@ -2520,11 +2487,9 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe // initFundingWorkflow sends a message to the funding manager instructing it // to initiate a single funder workflow with the source peer. // TODO(roasbeef): re-visit blocking nature.. -func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress, - req *openChanReq) { - +func (f *fundingManager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) { f.fundingRequests <- &initFundingMsg{ - peerAddress: peerAddress, + peer: peer, openChanReq: req, } } @@ -2534,7 +2499,7 @@ func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress, // funding workflow. func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { var ( - peerKey = msg.peerAddress.IdentityKey + peerKey = msg.peer.IdentityKey() localAmt = msg.localFundingAmt remoteAmt = msg.remoteFundingAmt capacity = localAmt + remoteAmt @@ -2553,8 +2518,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+ "capacity=%v, chainhash=%v, addr=%v, dustLimit=%v)", localAmt, - msg.pushAmt, capacity, msg.chainHash, msg.peerAddress.Address, - ourDustLimit) + msg.pushAmt, capacity, msg.chainHash, peerKey, ourDustLimit) // First, we'll query the fee estimator for a fee that should get the // commitment transaction confirmed by the next few blocks (conf target @@ -2590,7 +2554,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { // request will fail, and be aborted. reservation, err := f.cfg.Wallet.InitChannelReservation( capacity, localAmt, msg.pushAmt, commitFeePerKw, - msg.fundingFeePerVSize, peerKey, msg.peerAddress.Address, + msg.fundingFeePerVSize, peerKey, msg.peer.Address(), &msg.chainHash, channelFlags, ) if err != nil { @@ -2631,7 +2595,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { remoteCsvDelay: remoteCsvDelay, remoteMinHtlc: minHtlc, reservation: reservation, - peerAddress: msg.peerAddress, + peer: msg.peer, updates: msg.updates, err: msg.err, } @@ -2653,7 +2617,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { maxHtlcs := f.cfg.RequiredRemoteMaxHTLCs(capacity) fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)", - msg.peerAddress.Address, chanID) + msg.peer.Address(), chanID) fundingOpen := lnwire.OpenChannel{ ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash, @@ -2675,7 +2639,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { FirstCommitmentPoint: ourContribution.FirstCommitmentPoint, ChannelFlags: channelFlags, } - if err := f.cfg.SendToPeer(peerKey, &fundingOpen); err != nil { + if err := msg.peer.SendMessage(false, &fundingOpen); err != nil { e := fmt.Errorf("Unable to send funding request message: %v", err) fndgLog.Errorf(e.Error()) @@ -2786,10 +2750,11 @@ func (f *fundingManager) pruneZombieReservations() { f.resMtx.RUnlock() for pendingChanID, resCtx := range zombieReservations { - err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ - "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + err := fmt.Errorf("reservation timed out waiting for peer "+ + "(peerID:%v, chanID:%x)", resCtx.peer.IdentityKey(), + pendingChanID[:]) fndgLog.Warnf(err.Error()) - f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + f.failFundingFlow(resCtx.peer, pendingChanID, err) } } diff --git a/peer.go b/peer.go index 4d720fe8..e1113c24 100644 --- a/peer.go +++ b/peer.go @@ -911,15 +911,15 @@ out: p.queueMsg(lnwire.NewPong(pongBytes), nil) case *lnwire.OpenChannel: - p.server.fundingMgr.processFundingOpen(msg, p.addr) + p.server.fundingMgr.processFundingOpen(msg, p) case *lnwire.AcceptChannel: - p.server.fundingMgr.processFundingAccept(msg, p.addr) + p.server.fundingMgr.processFundingAccept(msg, p) case *lnwire.FundingCreated: - p.server.fundingMgr.processFundingCreated(msg, p.addr) + p.server.fundingMgr.processFundingCreated(msg, p) case *lnwire.FundingSigned: - p.server.fundingMgr.processFundingSigned(msg, p.addr) + p.server.fundingMgr.processFundingSigned(msg, p) case *lnwire.FundingLocked: - p.server.fundingMgr.processFundingLocked(msg, p.addr) + p.server.fundingMgr.processFundingLocked(msg, p) case *lnwire.Shutdown: select { diff --git a/server.go b/server.go index 84cb4237..cd979aa5 100644 --- a/server.go +++ b/server.go @@ -2520,7 +2520,7 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey, // TODO(roasbeef): pass in chan that's closed if/when funding succeeds // so can track as persistent peer? - go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) + go s.fundingMgr.initFundingWorkflow(targetPeer, req) return updateChan, errChan }