package cln_plugin import ( "fmt" "log" "net" "sync" "time" "github.com/breez/lspd/cln_plugin/proto" grpc "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) // Internal htlc_accepted message meant for the sendQueue. type htlcAcceptedMsg struct { id string htlc *HtlcAccepted timeout time.Time } // Internal htlc result message meant for the recvQueue. type htlcResultMsg struct { id string result interface{} } // Internal custommsg message meant for the sendQueue. type custommsgMsg struct { id string custommsg *CustomMessageRequest timeout time.Time } // Internal custommsg result message meant for the recvQueue. type custommsgResultMsg struct { id string result interface{} } type server struct { proto.ClnPluginServer listenAddress string subscriberTimeout time.Duration grpcServer *grpc.Server mtx sync.Mutex started chan struct{} done chan struct{} completed chan struct{} startError chan error htlcnewSubscriber chan struct{} htlcStream proto.ClnPlugin_HtlcStreamServer htlcSendQueue chan *htlcAcceptedMsg htlcRecvQueue chan *htlcResultMsg custommsgNewSubscriber chan struct{} custommsgStream proto.ClnPlugin_CustomMsgStreamServer custommsgSendQueue chan *custommsgMsg custommsgRecvQueue chan *custommsgResultMsg } // Creates a new grpc server func NewServer(listenAddress string, subscriberTimeout time.Duration) *server { // TODO: Set a sane max queue size return &server{ listenAddress: listenAddress, subscriberTimeout: subscriberTimeout, // The send queue exists to buffer messages until a subscriber is active. htlcSendQueue: make(chan *htlcAcceptedMsg, 10000), custommsgSendQueue: make(chan *custommsgMsg, 10000), // The receive queue exists mainly to allow returning timeouts to the // cln plugin. If there is no subscriber active within the subscriber // timeout period these results can be put directly on the receive queue. htlcRecvQueue: make(chan *htlcResultMsg, 10000), custommsgRecvQueue: make(chan *custommsgResultMsg, 10000), started: make(chan struct{}), startError: make(chan error, 1), } } // Starts the grpc server. Blocks until the servver is stopped. WaitStarted can // be called to ensure the server is started without errors if this function // is run as a goroutine. func (s *server) Start() error { s.mtx.Lock() if s.grpcServer != nil { s.mtx.Unlock() return nil } lis, err := net.Listen("tcp", s.listenAddress) if err != nil { log.Printf("ERROR Server failed to listen: %v", err) s.startError <- err s.mtx.Unlock() return err } s.done = make(chan struct{}) s.completed = make(chan struct{}) s.htlcnewSubscriber = make(chan struct{}) s.custommsgNewSubscriber = make(chan struct{}) s.grpcServer = grpc.NewServer( grpc.KeepaliveParams(keepalive.ServerParameters{ Time: time.Duration(1) * time.Second, Timeout: time.Duration(10) * time.Second, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: time.Duration(1) * time.Second, }), ) s.mtx.Unlock() proto.RegisterClnPluginServer(s.grpcServer, s) log.Printf("Server starting to listen on %s.", s.listenAddress) go s.listenHtlcRequests() go s.listenHtlcResponses() go s.listenCustomMsgRequests() close(s.started) err = s.grpcServer.Serve(lis) close(s.completed) return err } // Waits until the server has started, or errored during startup. func (s *server) WaitStarted() error { select { case <-s.started: return nil case err := <-s.startError: return err } } // Stops all work from the grpc server immediately. func (s *server) Stop() { s.mtx.Lock() defer s.mtx.Unlock() log.Printf("Server Stop() called.") if s.grpcServer == nil { return } s.grpcServer.Stop() s.grpcServer = nil close(s.done) <-s.completed log.Printf("Server stopped.") } // Grpc method that is called when a new client subscribes. There can only be // one subscriber active at a time. If there is an error receiving or sending // from or to the subscriber, the subscription is closed. func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { s.mtx.Lock() if s.htlcStream == nil { log.Printf("Got a new HTLC stream subscription request.") } else { s.mtx.Unlock() log.Printf("Got a HTLC stream subscription request, but subscription " + "was already active.") return fmt.Errorf("already subscribed") } s.htlcStream = stream // Notify listeners that a new subscriber is active. Replace the chan with // a new one immediately in case this subscriber is dropped later. close(s.htlcnewSubscriber) s.htlcnewSubscriber = make(chan struct{}) s.mtx.Unlock() <-stream.Context().Done() log.Printf("HtlcStream context is done. Return: %v", stream.Context().Err()) // Remove the subscriber. s.mtx.Lock() s.htlcStream = nil s.mtx.Unlock() return stream.Context().Err() } // Enqueues a htlc_accepted message for send to the grpc client. func (s *server) SendHtlcAccepted(id string, h *HtlcAccepted) { s.htlcSendQueue <- &htlcAcceptedMsg{ id: id, htlc: h, timeout: time.Now().Add(s.subscriberTimeout), } } // Receives the next htlc resolution message from the grpc client. Returns id // and message. Blocks until a message is available. Returns a nil message if // the server is done. This function effectively waits until a subscriber is // active and has sent a message. func (s *server) ReceiveHtlcResolution() (string, interface{}) { select { case <-s.done: return "", nil case msg := <-s.htlcRecvQueue: return msg.id, msg.result } } // Listens to sendQueue for htlc_accepted requests from cln. The message will be // held until a subscriber is active, or the subscriber timeout expires. The // messages are sent to the grpc client in fifo order. func (s *server) listenHtlcRequests() { for { select { case <-s.done: log.Printf("listenHtlcRequests received done. Stop listening.") return case msg := <-s.htlcSendQueue: s.handleHtlcAccepted(msg) } } } // Attempts to send a htlc_accepted message to the grpc client. The message will // be held until a subscriber is active, or the subscriber timeout expires. func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { for { s.mtx.Lock() stream := s.htlcStream ns := s.htlcnewSubscriber s.mtx.Unlock() // If there is no active subscription, wait until there is a new // subscriber, or the message times out. if stream == nil { select { case <-s.done: log.Printf("handleHtlcAccepted received server done. Stop processing.") return case <-ns: log.Printf("got a new subscriber. continue handleHtlcAccepted.") continue case <-time.After(time.Until(msg.timeout)): log.Printf( "WARNING: htlc with id '%s' timed out after '%v' waiting "+ "for grpc subscriber: %+v", msg.id, s.subscriberTimeout, msg.htlc, ) // If the subscriber timeout expires while holding the htlc // we short circuit the htlc by sending the default result // (continue) to cln. s.htlcRecvQueue <- &htlcResultMsg{ id: msg.id, result: s.defaultResult(), } return } } // There is a subscriber. Attempt to send the htlc_accepted message. err := stream.Send(&proto.HtlcAccepted{ Correlationid: msg.id, Onion: &proto.Onion{ Payload: msg.htlc.Onion.Payload, ShortChannelId: msg.htlc.Onion.ShortChannelId, ForwardMsat: msg.htlc.Onion.ForwardMsat, OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue, SharedSecret: msg.htlc.Onion.SharedSecret, NextOnion: msg.htlc.Onion.NextOnion, }, Htlc: &proto.Htlc{ ShortChannelId: msg.htlc.Htlc.ShortChannelId, Id: msg.htlc.Htlc.Id, AmountMsat: msg.htlc.Htlc.AmountMsat, CltvExpiry: msg.htlc.Htlc.CltvExpiry, CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative, PaymentHash: msg.htlc.Htlc.PaymentHash, }, ForwardTo: msg.htlc.ForwardTo, }) // If there is no error, we're done. if err == nil { return } // If we end up here, there was an error sending the message to the // grpc client. // TODO: If the Send errors, but the context is not done, this will // currently retry immediately. Check whether the context is really // done on an error! log.Printf("Error sending htlc_accepted message to subscriber. Retrying: %v", err) } } // Listens to htlc responses from the grpc client and appends them to the // receive queue. The messages from the receive queue are read in the Receive // function. func (s *server) listenHtlcResponses() { for { select { case <-s.done: log.Printf("listenHtlcResponses received done. Stopping listening.") return default: resp := s.recvHtlcResolution() s.htlcRecvQueue <- &htlcResultMsg{ id: resp.Correlationid, result: s.mapHtlcResult(resp.Outcome), } } } } // Helper function that blocks until a message from a grpc client is received // or the server stops. Either returns a received message, or nil if the server // has stopped. func (s *server) recvHtlcResolution() *proto.HtlcResolution { for { // make a copy of the used fields, to make sure state updates don't // surprise us. The newSubscriber chan is swapped whenever a new // subscriber arrives. s.mtx.Lock() stream := s.htlcStream ns := s.htlcnewSubscriber s.mtx.Unlock() if stream == nil { log.Printf("Got no subscribers for htlc receive. Waiting for subscriber.") select { case <-s.done: log.Printf("Done signalled, stopping htlc receive.") return nil case <-ns: log.Printf("New subscription available for htlc receive, continue receive.") continue } } // There is a subscription active. Attempt to receive a message. r, err := stream.Recv() if err == nil { log.Printf("Received HtlcResolution %+v", r) return r } // Receiving the message failed, so the subscription is broken. Remove // it if it hasn't been updated already. We'll try receiving again in // the next iteration of the for loop. // TODO: If the Recv errors, but the context is not done, this will // currently retry immediately. Check whether the context is really // done on an error! log.Printf("Recv() errored, Retrying: %v", err) } } // Maps a grpc result to the corresponding result for cln. The cln message // is a raw json message, so it's easiest to use a map directly. func (s *server) mapHtlcResult(outcome interface{}) interface{} { // result: continue cont, ok := outcome.(*proto.HtlcResolution_Continue) if ok { result := map[string]interface{}{ "result": "continue", } if cont.Continue.ForwardTo != nil { result["forward_to"] = *cont.Continue.ForwardTo } if cont.Continue.Payload != nil { result["payload"] = *cont.Continue.Payload } return result } // result: fail fail, ok := outcome.(*proto.HtlcResolution_Fail) if ok { result := map[string]interface{}{ "result": "fail", } fm, ok := fail.Fail.Failure.(*proto.HtlcFail_FailureMessage) if ok { result["failure_message"] = fm.FailureMessage } fo, ok := fail.Fail.Failure.(*proto.HtlcFail_FailureOnion) if ok { result["failure_onion"] = fo.FailureOnion } return result } // result: resolve resolve, ok := outcome.(*proto.HtlcResolution_Resolve) if ok { result := map[string]interface{}{ "result": "resolve", "payment_key": resolve.Resolve.PaymentKey, } return result } // On an unknown result we haven't implemented all possible cases from the // grpc message. We don't understand what's going on, so we'll return // result: continue. log.Printf("Unexpected htlc resolution type %T: %+v", outcome, outcome) return s.defaultResult() } // Grpc method that is called when a new client subscribes. There can only be // one subscriber active at a time. If there is an error receiving or sending // from or to the subscriber, the subscription is closed. func (s *server) CustomMsgStream( _ *proto.CustomMessageRequest, stream proto.ClnPlugin_CustomMsgStreamServer, ) error { s.mtx.Lock() if s.custommsgStream == nil { log.Printf("Got a new custommsg stream subscription request.") } else { s.mtx.Unlock() log.Printf("Got a custommsg stream subscription request, but " + "subscription was already active.") return fmt.Errorf("already subscribed") } s.custommsgStream = stream // Notify listeners that a new subscriber is active. Replace the chan with // a new one immediately in case this subscriber is dropped later. close(s.custommsgNewSubscriber) s.custommsgNewSubscriber = make(chan struct{}) s.mtx.Unlock() <-stream.Context().Done() log.Printf( "CustomMsgStream context is done. Return: %v", stream.Context().Err(), ) // Remove the subscriber. s.mtx.Lock() s.custommsgStream = nil s.mtx.Unlock() return stream.Context().Err() } // Enqueues a htlc_accepted message for send to the grpc client. func (s *server) SendCustomMessage(id string, c *CustomMessageRequest) { s.custommsgSendQueue <- &custommsgMsg{ id: id, custommsg: c, timeout: time.Now().Add(s.subscriberTimeout), } } // Receives the next custommsg response message from the grpc client. Returns id // and message. Blocks until a message is available. Returns a nil message if // the server is done. This function effectively waits until a subscriber is // active and has sent a message. func (s *server) ReceiveCustomMessageResponse() (string, interface{}) { select { case <-s.done: return "", nil case msg := <-s.custommsgRecvQueue: return msg.id, msg.result } } // Listens to sendQueue for custommsg requests from cln. The message will be // held until a subscriber is active, or the subscriber timeout expires. The // messages are sent to the grpc client in fifo order. func (s *server) listenCustomMsgRequests() { for { select { case <-s.done: log.Printf("listenCustomMsgRequests received done. Stop listening.") return case msg := <-s.custommsgSendQueue: s.handleCustomMsg(msg) } } } // Attempts to send a custommsg message to the grpc client. The message will // be held until a subscriber is active, or the subscriber timeout expires. func (s *server) handleCustomMsg(msg *custommsgMsg) { for { s.mtx.Lock() stream := s.custommsgStream ns := s.custommsgNewSubscriber s.mtx.Unlock() // If there is no active subscription, wait until there is a new // subscriber, or the message times out. if stream == nil { select { case <-s.done: log.Printf("handleCustomMsg received server done. Stop processing.") return case <-ns: log.Printf("got a new subscriber. continue handleCustomMsg.") continue case <-time.After(time.Until(msg.timeout)): log.Printf( "WARNING: custommsg with id '%s' timed out after '%v' waiting "+ "for grpc subscriber: %+v", msg.id, s.subscriberTimeout, msg.custommsg, ) // If the subscriber timeout expires while holding the custommsg // we ignore the message by sending the default result // (continue) to cln. s.custommsgRecvQueue <- &custommsgResultMsg{ id: msg.id, result: s.defaultResult(), } return } } // There is a subscriber. Attempt to send the custommsg message. err := stream.Send(&proto.CustomMessage{ PeerId: msg.custommsg.PeerId, Payload: msg.custommsg.Payload, }) // If there is no error, we're done, mark the message as handled. if err == nil { s.custommsgRecvQueue <- &custommsgResultMsg{ id: msg.id, result: s.defaultResult(), } return } // If we end up here, there was an error sending the message to the // grpc client. // TODO: If the Send errors, but the context is not done, this will // currently retry immediately. Check whether the context is really // done on an error! log.Printf("Error sending custommsg message to subscriber. Retrying: %v", err) } } // Returns a result: continue message. func (s *server) defaultResult() interface{} { return map[string]interface{}{ "result": "continue", } }