From 5c6f7da2655578b4c689f4698ddf1661b2846c2a Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 10 Aug 2023 08:50:47 +0200 Subject: [PATCH] cln_plugin: rename htlc related fields/functions --- cln_plugin/cln_plugin.go | 12 +++++---- cln_plugin/server.go | 58 ++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/cln_plugin/cln_plugin.go b/cln_plugin/cln_plugin.go index 778fa33..ba10f1f 100644 --- a/cln_plugin/cln_plugin.go +++ b/cln_plugin/cln_plugin.go @@ -126,13 +126,13 @@ func (c *ClnPlugin) listenRequests() error { } // Listens to responses to htlc_accepted requests from the grpc server. -func (c *ClnPlugin) listenServer() { +func (c *ClnPlugin) htlcListenServer() { for { select { case <-c.done: return default: - id, result := c.server.Receive() + id, result := c.server.ReceiveHtlcResolution() // The server may return nil if it is stopped. if result == nil { @@ -227,6 +227,8 @@ func (c *ClnPlugin) processRequest(request *Request) { }) case "setchannelacceptscript": c.handleSetChannelAcceptScript(request) + case "custommsg": + c.handleCustomMsg(request) default: c.sendError( request.Id, @@ -404,8 +406,8 @@ func (c *ClnPlugin) handleInit(request *Request) { return } - // Listen for responses from the grpc server. - go c.listenServer() + // Listen for htlc responses from the grpc server. + go c.htlcListenServer() // Let cln know the plugin is initialized. c.sendToCln(&Response{ @@ -436,7 +438,7 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) { return } - c.server.Send(idToString(request.Id), &htlc) + c.server.SendHtlcAccepted(idToString(request.Id), &htlc) } func (c *ClnPlugin) handleSetChannelAcceptScript(request *Request) { diff --git a/cln_plugin/server.go b/cln_plugin/server.go index b03c97c..8b8160f 100644 --- a/cln_plugin/server.go +++ b/cln_plugin/server.go @@ -31,14 +31,14 @@ type server struct { subscriberTimeout time.Duration grpcServer *grpc.Server mtx sync.Mutex - stream proto.ClnPlugin_HtlcStreamServer - newSubscriber chan struct{} started chan struct{} done chan struct{} completed chan struct{} startError chan error - sendQueue chan *htlcAcceptedMsg - recvQueue chan *htlcResultMsg + htlcnewSubscriber chan struct{} + htlcStream proto.ClnPlugin_HtlcStreamServer + htlcSendQueue chan *htlcAcceptedMsg + htlcRecvQueue chan *htlcResultMsg } // Creates a new grpc server @@ -48,13 +48,13 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server { listenAddress: listenAddress, subscriberTimeout: subscriberTimeout, // The send queue exists to buffer messages until a subscriber is active. - sendQueue: make(chan *htlcAcceptedMsg, 10000), + htlcSendQueue: make(chan *htlcAcceptedMsg, 10000), // The receive queue exists mainly to allow returning timeouts to the // cln plugin. If there is no subscriber active within the subscriber // timeout period these results can be put directly on the receive queue. - recvQueue: make(chan *htlcResultMsg, 10000), - started: make(chan struct{}), - startError: make(chan error, 1), + htlcRecvQueue: make(chan *htlcResultMsg, 10000), + started: make(chan struct{}), + startError: make(chan error, 1), } } @@ -78,7 +78,7 @@ func (s *server) Start() error { s.done = make(chan struct{}) s.completed = make(chan struct{}) - s.newSubscriber = make(chan struct{}) + s.htlcnewSubscriber = make(chan struct{}) s.grpcServer = grpc.NewServer( grpc.KeepaliveParams(keepalive.ServerParameters{ Time: time.Duration(1) * time.Second, @@ -132,7 +132,7 @@ func (s *server) Stop() { // from or to the subscriber, the subscription is closed. func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { s.mtx.Lock() - if s.stream == nil { + if s.htlcStream == nil { log.Printf("Got a new HTLC stream subscription request.") } else { s.mtx.Unlock() @@ -141,12 +141,12 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { return fmt.Errorf("already subscribed") } - s.stream = stream + s.htlcStream = stream // Notify listeners that a new subscriber is active. Replace the chan with // a new one immediately in case this subscriber is dropped later. - close(s.newSubscriber) - s.newSubscriber = make(chan struct{}) + close(s.htlcnewSubscriber) + s.htlcnewSubscriber = make(chan struct{}) s.mtx.Unlock() <-stream.Context().Done() @@ -154,15 +154,15 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { // Remove the subscriber. s.mtx.Lock() - s.stream = nil + s.htlcStream = nil s.mtx.Unlock() return stream.Context().Err() } // Enqueues a htlc_accepted message for send to the grpc client. -func (s *server) Send(id string, h *HtlcAccepted) { - s.sendQueue <- &htlcAcceptedMsg{ +func (s *server) SendHtlcAccepted(id string, h *HtlcAccepted) { + s.htlcSendQueue <- &htlcAcceptedMsg{ id: id, htlc: h, timeout: time.Now().Add(s.subscriberTimeout), @@ -173,11 +173,11 @@ func (s *server) Send(id string, h *HtlcAccepted) { // and message. Blocks until a message is available. Returns a nil message if // the server is done. This function effectively waits until a subscriber is // active and has sent a message. -func (s *server) Receive() (string, interface{}) { +func (s *server) ReceiveHtlcResolution() (string, interface{}) { select { case <-s.done: return "", nil - case msg := <-s.recvQueue: + case msg := <-s.htlcRecvQueue: return msg.id, msg.result } } @@ -191,7 +191,7 @@ func (s *server) listenHtlcRequests() { case <-s.done: log.Printf("listenHtlcRequests received done. Stop listening.") return - case msg := <-s.sendQueue: + case msg := <-s.htlcSendQueue: s.handleHtlcAccepted(msg) } } @@ -202,8 +202,8 @@ func (s *server) listenHtlcRequests() { func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { for { s.mtx.Lock() - stream := s.stream - ns := s.newSubscriber + stream := s.htlcStream + ns := s.htlcnewSubscriber s.mtx.Unlock() // If there is no active subscription, wait until there is a new @@ -228,7 +228,7 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { // If the subscriber timeout expires while holding the htlc // we short circuit the htlc by sending the default result // (continue) to cln. - s.recvQueue <- &htlcResultMsg{ + s.htlcRecvQueue <- &htlcResultMsg{ id: msg.id, result: s.defaultResult(), } @@ -283,10 +283,10 @@ func (s *server) listenHtlcResponses() { log.Printf("listenHtlcResponses received done. Stopping listening.") return default: - resp := s.recv() - s.recvQueue <- &htlcResultMsg{ + resp := s.recvHtlcResolution() + s.htlcRecvQueue <- &htlcResultMsg{ id: resp.Correlationid, - result: s.mapResult(resp.Outcome), + result: s.mapHtlcResult(resp.Outcome), } } } @@ -295,14 +295,14 @@ func (s *server) listenHtlcResponses() { // Helper function that blocks until a message from a grpc client is received // or the server stops. Either returns a received message, or nil if the server // has stopped. -func (s *server) recv() *proto.HtlcResolution { +func (s *server) recvHtlcResolution() *proto.HtlcResolution { for { // make a copy of the used fields, to make sure state updates don't // surprise us. The newSubscriber chan is swapped whenever a new // subscriber arrives. s.mtx.Lock() - stream := s.stream - ns := s.newSubscriber + stream := s.htlcStream + ns := s.htlcnewSubscriber s.mtx.Unlock() if stream == nil { @@ -336,7 +336,7 @@ func (s *server) recv() *proto.HtlcResolution { // Maps a grpc result to the corresponding result for cln. The cln message // is a raw json message, so it's easiest to use a map directly. -func (s *server) mapResult(outcome interface{}) interface{} { +func (s *server) mapHtlcResult(outcome interface{}) interface{} { // result: continue cont, ok := outcome.(*proto.HtlcResolution_Continue) if ok {