diff --git a/cln_plugin/server.go b/cln_plugin/server.go index ba8b7b4..4320cf8 100644 --- a/cln_plugin/server.go +++ b/cln_plugin/server.go @@ -8,6 +8,7 @@ import ( "time" "github.com/breez/lspd/cln_plugin/proto" + orderedmap "github.com/wk8/go-ordered-map/v2" grpc "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) @@ -52,6 +53,7 @@ type server struct { htlcStream proto.ClnPlugin_HtlcStreamServer htlcSendQueue chan *htlcAcceptedMsg htlcRecvQueue chan *htlcResultMsg + inflightHtlcs *orderedmap.OrderedMap[string, *htlcAcceptedMsg] custommsgNewSubscriber chan struct{} custommsgStream proto.ClnPlugin_CustomMsgStreamServer custommsgSendQueue chan *custommsgMsg @@ -71,6 +73,7 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server { // cln plugin. If there is no subscriber active within the subscriber // timeout period these results can be put directly on the receive queue. htlcRecvQueue: make(chan *htlcResultMsg, 10000), + inflightHtlcs: orderedmap.New[string, *htlcAcceptedMsg](), custommsgRecvQueue: make(chan *custommsgResultMsg, 10000), started: make(chan struct{}), startError: make(chan error, 1), @@ -162,6 +165,19 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { return fmt.Errorf("already subscribed") } + newTimeout := time.Now().Add(s.subscriberTimeout) + // Replay in-flight htlcs in fifo order + for pair := s.inflightHtlcs.Oldest(); pair != nil; pair = pair.Next() { + err := sendHtlcAccepted(stream, pair.Value) + if err != nil { + s.mtx.Unlock() + return err + } + + // Reset the subscriber timeout for this htlc. + pair.Value.timeout = newTimeout + } + s.htlcStream = stream // Notify listeners that a new subscriber is active. Replace the chan with @@ -199,6 +215,9 @@ func (s *server) ReceiveHtlcResolution() (string, interface{}) { case <-s.done: return "", nil case msg := <-s.htlcRecvQueue: + s.mtx.Lock() + s.inflightHtlcs.Delete(msg.id) + s.mtx.Unlock() return msg.id, msg.result } } @@ -258,31 +277,22 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { } } + // Add the htlc to in-flight htlcs + s.mtx.Lock() + s.inflightHtlcs.Set(msg.id, msg) + // There is a subscriber. Attempt to send the htlc_accepted message. - err := stream.Send(&proto.HtlcAccepted{ - Correlationid: msg.id, - Onion: &proto.Onion{ - Payload: msg.htlc.Onion.Payload, - ShortChannelId: msg.htlc.Onion.ShortChannelId, - ForwardMsat: msg.htlc.Onion.ForwardMsat, - OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue, - SharedSecret: msg.htlc.Onion.SharedSecret, - NextOnion: msg.htlc.Onion.NextOnion, - }, - Htlc: &proto.Htlc{ - ShortChannelId: msg.htlc.Htlc.ShortChannelId, - Id: msg.htlc.Htlc.Id, - AmountMsat: msg.htlc.Htlc.AmountMsat, - CltvExpiry: msg.htlc.Htlc.CltvExpiry, - CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative, - PaymentHash: msg.htlc.Htlc.PaymentHash, - }, - ForwardTo: msg.htlc.ForwardTo, - }) + err := sendHtlcAccepted(stream, msg) // If there is no error, we're done. if err == nil { + s.mtx.Unlock() return + } else { + // Remove the htlc from inflight htlcs again on error, so it won't + // get replayed twice in a row. + s.inflightHtlcs.Delete(msg.id) + s.mtx.Unlock() } // If we end up here, there was an error sending the message to the @@ -324,6 +334,11 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution { s.mtx.Lock() stream := s.htlcStream ns := s.htlcnewSubscriber + oldestHtlc := s.inflightHtlcs.Oldest() + var htlcTimeout time.Duration = 1 << 62 // practically infinite + if oldestHtlc != nil { + htlcTimeout = time.Until(oldestHtlc.Value.timeout) + } s.mtx.Unlock() if stream == nil { @@ -335,6 +350,24 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution { case <-ns: log.Printf("New subscription available for htlc receive, continue receive.") continue + case <-time.After(htlcTimeout): + log.Printf( + "WARNING: htlc with id '%s' timed out after '%v' waiting "+ + "for grpc subscriber: %+v", + oldestHtlc.Value.id, + s.subscriberTimeout, + oldestHtlc.Value.htlc, + ) + + // If the subscriber timeout expires while holding a htlc + // we short circuit the htlc by sending the default result + // (continue) to cln. + return &proto.HtlcResolution{ + Correlationid: oldestHtlc.Value.id, + Outcome: &proto.HtlcResolution_Continue{ + Continue: &proto.HtlcContinue{}, + }, + } } } @@ -561,3 +594,26 @@ func (s *server) defaultResult() interface{} { "result": "continue", } } + +func sendHtlcAccepted(stream proto.ClnPlugin_HtlcStreamServer, msg *htlcAcceptedMsg) error { + return stream.Send(&proto.HtlcAccepted{ + Correlationid: msg.id, + Onion: &proto.Onion{ + Payload: msg.htlc.Onion.Payload, + ShortChannelId: msg.htlc.Onion.ShortChannelId, + ForwardMsat: msg.htlc.Onion.ForwardMsat, + OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue, + SharedSecret: msg.htlc.Onion.SharedSecret, + NextOnion: msg.htlc.Onion.NextOnion, + }, + Htlc: &proto.Htlc{ + ShortChannelId: msg.htlc.Htlc.ShortChannelId, + Id: msg.htlc.Htlc.Id, + AmountMsat: msg.htlc.Htlc.AmountMsat, + CltvExpiry: msg.htlc.Htlc.CltvExpiry, + CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative, + PaymentHash: msg.htlc.Htlc.PaymentHash, + }, + ForwardTo: msg.htlc.ForwardTo, + }) +} diff --git a/go.mod b/go.mod index 3d8a129..eb6e972 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( require ( github.com/Microsoft/go-winio v0.5.2 // indirect github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect github.com/ethereum/go-ethereum v1.12.1 // indirect @@ -37,6 +39,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/lightninglabs/neutrino/cache v1.1.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/moby/term v0.0.0-20221120202655-abb19827d345 // indirect github.com/morikuni/aec v1.0.0 // indirect @@ -151,6 +154,7 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/ulikunitz/xz v0.5.10 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.6 // indirect