Merge pull request #3582 from hsjoberg/pendingchannelevent

rpc: Add PendingOpenChannel to SubscribeChannelEvents
This commit is contained in:
Johan T. Halseth
2020-01-24 14:42:40 +01:00
committed by GitHub
9 changed files with 720 additions and 644 deletions

View File

@@ -20,6 +20,13 @@ type ChannelNotifier struct {
chanDB *channeldb.DB
}
// PendingOpenChannelEvent represents a new event where a new channel has
// entered a pending open state.
type PendingOpenChannelEvent struct {
// ChannelPoint is the channelpoint for the new channel.
ChannelPoint *wire.OutPoint
}
// OpenChannelEvent represents a new event where a channel goes from pending
// open to open.
type OpenChannelEvent struct {
@@ -82,6 +89,16 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a
// new channel is pending.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) {
event := PendingOpenChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send pending open channel update: %v", err)
}
}
// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a
// channel has gone from pending open to open.
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {

View File

@@ -363,6 +363,10 @@ type fundingConfig struct {
// and on the requesting node's public key that returns a bool which tells
// the funding manager whether or not to accept the channel.
OpenChannelPredicate chanacceptor.ChannelAcceptor
// NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels
// enter a pending state.
NotifyPendingOpenChannelEvent func(wire.OutPoint)
}
// fundingManager acts as an orchestrator/bridge between the wallet's
@@ -1691,6 +1695,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
f.localDiscoverySignals[channelID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(fundingOut)
// At this point we have sent our last funding message to the
// initiating peer before the funding transaction will be broadcast.
// With this last message, our job as the responder is now complete.
@@ -1835,6 +1843,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
select {
case resCtx.updates <- upd:
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint)
case <-f.quit:
return
}

View File

@@ -376,11 +376,12 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
publTxChan <- txn
return nil
},
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
OpenChannelPredicate: chainedAcceptor,
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
OpenChannelPredicate: chainedAcceptor,
NotifyPendingOpenChannelEvent: func(wire.OutPoint) {},
}
for _, op := range options {

File diff suppressed because it is too large Load Diff

View File

@@ -1930,6 +1930,7 @@ message ChannelEventUpdate {
ChannelCloseSummary closed_channel = 2 [ json_name = "closed_channel" ];
ChannelPoint active_channel = 3 [ json_name = "active_channel" ];
ChannelPoint inactive_channel = 4 [ json_name = "inactive_channel" ];
PendingUpdate pending_open_channel = 6 [json_name = "pending_open_channel"];
}
enum UpdateType {
@@ -1937,6 +1938,7 @@ message ChannelEventUpdate {
CLOSED_CHANNEL = 1;
ACTIVE_CHANNEL = 2;
INACTIVE_CHANNEL = 3;
PENDING_OPEN_CHANNEL = 4;
}
UpdateType type = 5 [ json_name = "type" ];

View File

@@ -1467,7 +1467,8 @@
"OPEN_CHANNEL",
"CLOSED_CHANNEL",
"ACTIVE_CHANNEL",
"INACTIVE_CHANNEL"
"INACTIVE_CHANNEL",
"PENDING_OPEN_CHANNEL"
],
"default": "OPEN_CHANNEL"
},
@@ -2149,6 +2150,9 @@
"inactive_channel": {
"$ref": "#/definitions/lnrpcChannelPoint"
},
"pending_open_channel": {
"$ref": "#/definitions/lnrpcPendingUpdate"
},
"type": {
"$ref": "#/definitions/ChannelEventUpdateUpdateType"
}

View File

@@ -6460,25 +6460,34 @@ func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTe
// Since each of the channels just became open, Bob and Alice should
// each receive an open and an active notification for each channel.
var numChannelUpds int
const totalNtfns = 2 * numChannels
const totalNtfns = 3 * numChannels
verifyOpenUpdatesReceived := func(sub channelSubscription) error {
numChannelUpds = 0
for numChannelUpds < totalNtfns {
select {
case update := <-sub.updateChan:
switch update.Type {
case lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL:
if numChannelUpds%2 != 1 {
return fmt.Errorf("expected open" +
"channel ntfn, got active " +
case lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL:
if numChannelUpds%3 != 0 {
return fmt.Errorf("expected " +
"open or active" +
"channel ntfn, got pending open " +
"channel ntfn instead")
}
case lnrpc.ChannelEventUpdate_OPEN_CHANNEL:
if numChannelUpds%2 != 0 {
return fmt.Errorf("expected active" +
if numChannelUpds%3 != 1 {
return fmt.Errorf("expected " +
"pending open or active" +
"channel ntfn, got open" +
"channel ntfn instead")
}
case lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL:
if numChannelUpds%3 != 2 {
return fmt.Errorf("expected " +
"pending open or open" +
"channel ntfn, got active " +
"channel ntfn instead")
}
default:
return fmt.Errorf("update type mismatch: "+
"expected open or active channel "+

View File

@@ -3187,6 +3187,16 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
case e := <-channelEventSub.Updates():
var update *lnrpc.ChannelEventUpdate
switch event := e.(type) {
case channelnotifier.PendingOpenChannelEvent:
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_PendingOpenChannel{
PendingOpenChannel: &lnrpc.PendingUpdate{
Txid: event.ChannelPoint.Hash[:],
OutputIndex: event.ChannelPoint.Index,
},
},
}
case channelnotifier.OpenChannelEvent:
channel, err := createRPCOpenChannel(r, graph,
event.Channel, true)
@@ -5601,7 +5611,9 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
switch e.(type) {
// We only care about new/closed channels, so we'll
// skip any events for active/inactive channels.
// skip any events for pending/active/inactive channels.
case channelnotifier.PendingOpenChannelEvent:
continue
case channelnotifier.ActiveChannelEvent:
continue
case channelnotifier.InactiveChannelEvent:

View File

@@ -1113,13 +1113,14 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
// channel bandwidth.
return uint16(input.MaxHTLCNumber / 2)
},
ZombieSweeperInterval: 1 * time.Minute,
ReservationTimeout: 10 * time.Minute,
MinChanSize: btcutil.Amount(cfg.MinChanSize),
MaxPendingChannels: cfg.MaxPendingChannels,
RejectPush: cfg.RejectPush,
NotifyOpenChannelEvent: s.channelNotifier.NotifyOpenChannelEvent,
OpenChannelPredicate: chanPredicate,
ZombieSweeperInterval: 1 * time.Minute,
ReservationTimeout: 10 * time.Minute,
MinChanSize: btcutil.Amount(cfg.MinChanSize),
MaxPendingChannels: cfg.MaxPendingChannels,
RejectPush: cfg.RejectPush,
NotifyOpenChannelEvent: s.channelNotifier.NotifyOpenChannelEvent,
OpenChannelPredicate: chanPredicate,
NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
})
if err != nil {
return nil, err