cln_plugin: replay htlcs on reconnect

This commit is contained in:
Jesse de Wit
2023-08-31 14:45:24 +02:00
parent f03f5c0889
commit f3093cd23c
2 changed files with 80 additions and 20 deletions

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/breez/lspd/cln_plugin/proto" "github.com/breez/lspd/cln_plugin/proto"
orderedmap "github.com/wk8/go-ordered-map/v2"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
) )
@@ -52,6 +53,7 @@ type server struct {
htlcStream proto.ClnPlugin_HtlcStreamServer htlcStream proto.ClnPlugin_HtlcStreamServer
htlcSendQueue chan *htlcAcceptedMsg htlcSendQueue chan *htlcAcceptedMsg
htlcRecvQueue chan *htlcResultMsg htlcRecvQueue chan *htlcResultMsg
inflightHtlcs *orderedmap.OrderedMap[string, *htlcAcceptedMsg]
custommsgNewSubscriber chan struct{} custommsgNewSubscriber chan struct{}
custommsgStream proto.ClnPlugin_CustomMsgStreamServer custommsgStream proto.ClnPlugin_CustomMsgStreamServer
custommsgSendQueue chan *custommsgMsg custommsgSendQueue chan *custommsgMsg
@@ -71,6 +73,7 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
// cln plugin. If there is no subscriber active within the subscriber // cln plugin. If there is no subscriber active within the subscriber
// timeout period these results can be put directly on the receive queue. // timeout period these results can be put directly on the receive queue.
htlcRecvQueue: make(chan *htlcResultMsg, 10000), htlcRecvQueue: make(chan *htlcResultMsg, 10000),
inflightHtlcs: orderedmap.New[string, *htlcAcceptedMsg](),
custommsgRecvQueue: make(chan *custommsgResultMsg, 10000), custommsgRecvQueue: make(chan *custommsgResultMsg, 10000),
started: make(chan struct{}), started: make(chan struct{}),
startError: make(chan error, 1), startError: make(chan error, 1),
@@ -162,6 +165,19 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
return fmt.Errorf("already subscribed") return fmt.Errorf("already subscribed")
} }
newTimeout := time.Now().Add(s.subscriberTimeout)
// Replay in-flight htlcs in fifo order
for pair := s.inflightHtlcs.Oldest(); pair != nil; pair = pair.Next() {
err := sendHtlcAccepted(stream, pair.Value)
if err != nil {
s.mtx.Unlock()
return err
}
// Reset the subscriber timeout for this htlc.
pair.Value.timeout = newTimeout
}
s.htlcStream = stream s.htlcStream = stream
// Notify listeners that a new subscriber is active. Replace the chan with // Notify listeners that a new subscriber is active. Replace the chan with
@@ -199,6 +215,9 @@ func (s *server) ReceiveHtlcResolution() (string, interface{}) {
case <-s.done: case <-s.done:
return "", nil return "", nil
case msg := <-s.htlcRecvQueue: case msg := <-s.htlcRecvQueue:
s.mtx.Lock()
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
return msg.id, msg.result return msg.id, msg.result
} }
} }
@@ -258,31 +277,22 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
} }
} }
// Add the htlc to in-flight htlcs
s.mtx.Lock()
s.inflightHtlcs.Set(msg.id, msg)
// There is a subscriber. Attempt to send the htlc_accepted message. // There is a subscriber. Attempt to send the htlc_accepted message.
err := stream.Send(&proto.HtlcAccepted{ err := sendHtlcAccepted(stream, msg)
Correlationid: msg.id,
Onion: &proto.Onion{
Payload: msg.htlc.Onion.Payload,
ShortChannelId: msg.htlc.Onion.ShortChannelId,
ForwardMsat: msg.htlc.Onion.ForwardMsat,
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
SharedSecret: msg.htlc.Onion.SharedSecret,
NextOnion: msg.htlc.Onion.NextOnion,
},
Htlc: &proto.Htlc{
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
Id: msg.htlc.Htlc.Id,
AmountMsat: msg.htlc.Htlc.AmountMsat,
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
PaymentHash: msg.htlc.Htlc.PaymentHash,
},
ForwardTo: msg.htlc.ForwardTo,
})
// If there is no error, we're done. // If there is no error, we're done.
if err == nil { if err == nil {
s.mtx.Unlock()
return return
} else {
// Remove the htlc from inflight htlcs again on error, so it won't
// get replayed twice in a row.
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
} }
// If we end up here, there was an error sending the message to the // If we end up here, there was an error sending the message to the
@@ -324,6 +334,11 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
s.mtx.Lock() s.mtx.Lock()
stream := s.htlcStream stream := s.htlcStream
ns := s.htlcnewSubscriber ns := s.htlcnewSubscriber
oldestHtlc := s.inflightHtlcs.Oldest()
var htlcTimeout time.Duration = 1 << 62 // practically infinite
if oldestHtlc != nil {
htlcTimeout = time.Until(oldestHtlc.Value.timeout)
}
s.mtx.Unlock() s.mtx.Unlock()
if stream == nil { if stream == nil {
@@ -335,6 +350,24 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
case <-ns: case <-ns:
log.Printf("New subscription available for htlc receive, continue receive.") log.Printf("New subscription available for htlc receive, continue receive.")
continue continue
case <-time.After(htlcTimeout):
log.Printf(
"WARNING: htlc with id '%s' timed out after '%v' waiting "+
"for grpc subscriber: %+v",
oldestHtlc.Value.id,
s.subscriberTimeout,
oldestHtlc.Value.htlc,
)
// If the subscriber timeout expires while holding a htlc
// we short circuit the htlc by sending the default result
// (continue) to cln.
return &proto.HtlcResolution{
Correlationid: oldestHtlc.Value.id,
Outcome: &proto.HtlcResolution_Continue{
Continue: &proto.HtlcContinue{},
},
}
} }
} }
@@ -561,3 +594,26 @@ func (s *server) defaultResult() interface{} {
"result": "continue", "result": "continue",
} }
} }
func sendHtlcAccepted(stream proto.ClnPlugin_HtlcStreamServer, msg *htlcAcceptedMsg) error {
return stream.Send(&proto.HtlcAccepted{
Correlationid: msg.id,
Onion: &proto.Onion{
Payload: msg.htlc.Onion.Payload,
ShortChannelId: msg.htlc.Onion.ShortChannelId,
ForwardMsat: msg.htlc.Onion.ForwardMsat,
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
SharedSecret: msg.htlc.Onion.SharedSecret,
NextOnion: msg.htlc.Onion.NextOnion,
},
Htlc: &proto.Htlc{
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
Id: msg.htlc.Htlc.Id,
AmountMsat: msg.htlc.Htlc.AmountMsat,
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
PaymentHash: msg.htlc.Htlc.PaymentHash,
},
ForwardTo: msg.htlc.ForwardTo,
})
}

4
go.mod
View File

@@ -30,6 +30,8 @@ require (
require ( require (
github.com/Microsoft/go-winio v0.5.2 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/ethereum/go-ethereum v1.12.1 // indirect github.com/ethereum/go-ethereum v1.12.1 // indirect
@@ -37,6 +39,7 @@ require (
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lightninglabs/neutrino/cache v1.1.1 // indirect github.com/lightninglabs/neutrino/cache v1.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-isatty v0.0.16 // indirect
github.com/moby/term v0.0.0-20221120202655-abb19827d345 // indirect github.com/moby/term v0.0.0-20221120202655-abb19827d345 // indirect
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
@@ -151,6 +154,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect github.com/ulikunitz/xz v0.5.10 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/bbolt v1.3.6 // indirect