diff --git a/cln_plugin/cln_plugin.go b/cln_plugin/cln_plugin.go index 987fba6..4b217f0 100644 --- a/cln_plugin/cln_plugin.go +++ b/cln_plugin/cln_plugin.go @@ -60,43 +60,14 @@ func NewClnPlugin(in, out *os.File) *ClnPlugin { // NOTE: The grpc server is started in the handleInit function. func (c *ClnPlugin) Start() { c.setupLogging() - go c.listen() + go c.listenRequests() <-c.done } -func (c *ClnPlugin) setupLogging() { - in, out := io.Pipe() - go func(in io.Reader) { - // everytime we get a new message, log it thru c-lightning - scanner := bufio.NewScanner(in) - for { - select { - case <-c.done: - return - default: - if !scanner.Scan() { - if err := scanner.Err(); err != nil { - log.Fatalf( - "can't print out to std err, killing: %v", - err, - ) - } - } - - for _, line := range strings.Split(scanner.Text(), "\n") { - c.log("info", line) - } - } - } - - }(in) - log.SetFlags(log.Ltime | log.Lshortfile) - log.SetOutput(out) -} - // Stops the cln plugin. Drops any remaining work immediately. // Pending htlcs will be replayed when cln starts again. func (c *ClnPlugin) Stop() { + log.Printf("Stop called. Stopping plugin.") close(c.done) s := c.server @@ -107,7 +78,7 @@ func (c *ClnPlugin) Stop() { // listens stdout for requests from cln and sends the requests to the // appropriate handler in fifo order. -func (c *ClnPlugin) listen() error { +func (c *ClnPlugin) listenRequests() error { scanner := bufio.NewScanner(c.in) buf := make([]byte, 1024) scanner.Buffer(buf, MaxIntakeBuffer) @@ -129,8 +100,10 @@ func (c *ClnPlugin) listen() error { } msg := scanner.Bytes() - // TODO: Pipe logs to the proper place. + + // Always log the message json log.Println(string(msg)) + // pass down a copy so things stay sane msg_buf := make([]byte, len(msg)) copy(msg_buf, msg) @@ -141,15 +114,39 @@ func (c *ClnPlugin) listen() error { } } +// Listens to responses to htlc_accepted requests from the grpc server. +func (c *ClnPlugin) listenServer() { + for { + select { + case <-c.done: + return + default: + id, result := c.server.Receive() + + // The server may return nil if it is stopped. + if result == nil { + continue + } + + serid, _ := json.Marshal(&id) + c.sendToCln(&Response{ + Id: serid, + JsonRpc: SpecVersion, + Result: result, + }) + } + } +} + // processes a single message from cln. Sends the message to the appropriate // handler. func (c *ClnPlugin) processMsg(msg []byte) { if len(msg) == 0 { - c.sendError(nil, InvalidRequest, "Invalid Request") + c.sendError(nil, InvalidRequest, "Got an invalid zero length request") return } - // Right now we don't handle arrays of requests... + // Handle request batches. if msg[0] == '[' { var requests []*Request err := json.Unmarshal(msg, &requests) @@ -157,7 +154,7 @@ func (c *ClnPlugin) processMsg(msg []byte) { c.sendError( nil, ParseError, - fmt.Sprintf("Parse error:%s [%s]", err.Error(), msg), + fmt.Sprintf("Failed to unmarshal request batch: %v", err), ) return } @@ -173,11 +170,10 @@ func (c *ClnPlugin) processMsg(msg []byte) { var request Request err := json.Unmarshal(msg, &request) if err != nil { - log.Printf("failed to unmarshal request: %v", err) c.sendError( nil, ParseError, - fmt.Sprintf("Parse error:%s [%s]", err.Error(), msg), + fmt.Sprintf("failed to unmarshal request: %v", err), ) return } @@ -188,15 +184,11 @@ func (c *ClnPlugin) processMsg(msg []byte) { func (c *ClnPlugin) processRequest(request *Request) { // Make sure the spec version is expected. if request.JsonRpc != SpecVersion { - c.sendError( - request.Id, - InvalidRequest, - fmt.Sprintf( - `Invalid jsonrpc, expected '%s' got '%s'`, - SpecVersion, - request.JsonRpc, - ), - ) + c.sendError(request.Id, InvalidRequest, fmt.Sprintf( + `Invalid jsonrpc, expected '%s' got '%s'`, + SpecVersion, + request.JsonRpc, + )) return } @@ -235,9 +227,8 @@ func (c *ClnPlugin) handleGetManifest(request *Request) { { Name: SubscriberTimeoutOption, Type: "string", - Description: "htlc timeout duration when there is no " + - "subscriber to the grpc server. golang duration " + - "string.", + Description: "the maximum duration we will hold a htlc " + + "if no subscriber is active. golang duration string.", Default: &DefaultSubscriberTimeout, }, }, @@ -266,11 +257,7 @@ func (c *ClnPlugin) handleInit(request *Request) { c.sendError( request.Id, ParseError, - fmt.Sprintf( - "Error parsing init params:%s [%s]", - err.Error(), - request.Params, - ), + fmt.Sprintf("Failed to unmarshal init params: %v", err), ) return } @@ -362,30 +349,6 @@ func (c *ClnPlugin) handleInit(request *Request) { }) } -// Listens to responses to htlc_accepted requests from the grpc server. -func (c *ClnPlugin) listenServer() { - for { - select { - case <-c.done: - return - default: - id, result := c.server.Receive() - - // The server may return nil if it is stopped. - if result == nil { - continue - } - - serid, _ := json.Marshal(&id) - c.sendToCln(&Response{ - Id: serid, - JsonRpc: SpecVersion, - Result: result, - }) - } - } -} - // Handles the shutdown message. Stops any work immediately. func (c *ClnPlugin) handleShutdown(request *Request) { c.Stop() @@ -400,7 +363,7 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) { request.Id, ParseError, fmt.Sprintf( - "Error parsing htlc_accepted params:%s [%s]", + "Failed to unmarshal htlc_accepted params:%s [%s]", err.Error(), request.Params, ), @@ -408,11 +371,15 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) { return } - c.server.Send(c.idToString(request.Id), &htlc) + c.server.Send(idToString(request.Id), &htlc) } // Sends an error to cln. func (c *ClnPlugin) sendError(id json.RawMessage, code int, message string) { + // Log the error to cln first. + c.log("error", message) + + // Then create an error message. resp := &Response{ JsonRpc: SpecVersion, Error: &RpcError{ @@ -428,29 +395,14 @@ func (c *ClnPlugin) sendError(id json.RawMessage, code int, message string) { c.sendToCln(resp) } -// converts a raw cln id to string. The CLN id can either be an integer or a -// string. if it's a string, the quotes are removed. -func (c *ClnPlugin) idToString(id json.RawMessage) string { - if len(id) == 0 { - return "" - } - - str := string(id) - str = strings.TrimSpace(str) - str = strings.Trim(str, "\"") - str = strings.Trim(str, "'") - return str -} - // Sends a message to cln. func (c *ClnPlugin) sendToCln(msg interface{}) { c.writeMtx.Lock() defer c.writeMtx.Unlock() - // TODO: log data, err := json.Marshal(msg) if err != nil { - log.Println(err.Error()) + log.Printf("Failed to marshal message for cln, ignoring message: %+v", msg) return } @@ -459,6 +411,36 @@ func (c *ClnPlugin) sendToCln(msg interface{}) { c.out.Flush() } +func (c *ClnPlugin) setupLogging() { + in, out := io.Pipe() + log.SetFlags(log.Ltime | log.Lshortfile) + log.SetOutput(out) + go func(in io.Reader) { + // everytime we get a new message, log it thru c-lightning + scanner := bufio.NewScanner(in) + for { + select { + case <-c.done: + return + default: + if !scanner.Scan() { + if err := scanner.Err(); err != nil { + log.Fatalf( + "can't print out to std err, killing: %v", + err, + ) + } + } + + for _, line := range strings.Split(scanner.Text(), "\n") { + c.log("info", line) + } + } + } + + }(in) +} + func (c *ClnPlugin) log(level string, message string) { params, _ := json.Marshal(&LogNotification{ Level: level, @@ -486,3 +468,17 @@ func scanDoubleNewline( // the buffer if we're at EOF, with no /n/n present return 0, nil, nil } + +// converts a raw cln id to string. The CLN id can either be an integer or a +// string. if it's a string, the quotes are removed. +func idToString(id json.RawMessage) string { + if len(id) == 0 { + return "" + } + + str := string(id) + str = strings.TrimSpace(str) + str = strings.Trim(str, "\"") + str = strings.Trim(str, "'") + return str +} diff --git a/cln_plugin/server.go b/cln_plugin/server.go index 680b83a..e852dab 100644 --- a/cln_plugin/server.go +++ b/cln_plugin/server.go @@ -23,7 +23,7 @@ type subscription struct { type htlcAcceptedMsg struct { id string htlc *HtlcAccepted - timeout <-chan time.Time + timeout time.Time } // Internal htlc result message meant for the recvQueue. @@ -53,10 +53,14 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server { return &server{ listenAddress: listenAddress, subscriberTimeout: subscriberTimeout, - sendQueue: make(chan *htlcAcceptedMsg, 10000), - recvQueue: make(chan *htlcResultMsg, 10000), - started: make(chan struct{}), - startError: make(chan error, 1), + // The send queue exists to buffer messages until a subscriber is active. + sendQueue: make(chan *htlcAcceptedMsg, 10000), + // The receive queue exists mainly to allow returning timeouts to the + // cln plugin. If there is no subscriber active within the subscriber + // timeout period these results can be put directly on the receive queue. + recvQueue: make(chan *htlcResultMsg, 10000), + started: make(chan struct{}), + startError: make(chan error, 1), } } @@ -113,16 +117,20 @@ func (s *server) Stop() { close(s.done) s.grpcServer.Stop() s.grpcServer = nil + log.Printf("Server stopped.") } // Grpc method that is called when a new client subscribes. There can only be // one subscriber active at a time. If there is an error receiving or sending // from or to the subscriber, the subscription is closed. func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { - log.Printf("Got HTLC stream subscription request.") s.mtx.Lock() - if s.subscription != nil { + if s.subscription == nil { + log.Printf("Got a new HTLC stream subscription request.") + } else { s.mtx.Unlock() + log.Printf("Got a HTLC stream subscription request, but subscription " + + "was already active.") return fmt.Errorf("already subscribed") } @@ -140,10 +148,17 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { s.mtx.Unlock() defer func() { + // When the HtlcStream function returns, that means the subscriber will + // be gone. Cleanup the subscription so we'll be ready to accept a new + // one later. s.removeSubscriptionIfUnchanged(sb, nil) }() go func() { + // If the context is done, there will be no more connection with the + // client. Listen for context done and clean up the subscriber. + // Cleaning up the subscriber will make the HtlcStream function exit. + // (sb.done or sb.err) <-stream.Context().Done() log.Printf("HtlcStream context is done. Removing subscriber: %v", stream.Context().Err()) s.removeSubscriptionIfUnchanged(sb, stream.Context().Err()) @@ -167,13 +182,14 @@ func (s *server) Send(id string, h *HtlcAccepted) { s.sendQueue <- &htlcAcceptedMsg{ id: id, htlc: h, - timeout: time.After(s.subscriberTimeout), + timeout: time.Now().Add(s.subscriberTimeout), } } // Receives the next htlc resolution message from the grpc client. Returns id // and message. Blocks until a message is available. Returns a nil message if -// the server is done. +// the server is done. This function effectively waits until a subscriber is +// active and has sent a message. func (s *server) Receive() (string, interface{}) { select { case <-s.done: @@ -183,63 +199,6 @@ func (s *server) Receive() (string, interface{}) { } } -// Helper function that blocks until a message from a grpc client is received -// or the server stops. Either returns a received message, or nil if the server -// has stopped. -func (s *server) recv() *proto.HtlcResolution { - for { - // make a copy of the used fields, to make sure state updates don't - // surprise us. The newSubscriber chan is swapped whenever a new - // subscriber arrives. - s.mtx.Lock() - sb := s.subscription - ns := s.newSubscriber - s.mtx.Unlock() - - if sb == nil { - log.Printf("Got no subscribers for receive. Waiting for subscriber.") - select { - case <-s.done: - log.Printf("Done signalled, stopping receive.") - return nil - case <-ns: - log.Printf("New subscription available for receive, continue receive.") - continue - } - } - - // There is a subscription active. Attempt to receive a message. - r, err := sb.stream.Recv() - if err == nil { - log.Printf("Received HtlcResolution %+v", r) - return r - } - - // Receiving the message failed, so the subscription is broken. Remove - // it if it hasn't been updated already. We'll try receiving again in - // the next iteration of the for loop. - log.Printf("Recv() errored, removing subscription: %v", err) - s.removeSubscriptionIfUnchanged(sb, err) - } -} - -// Stops and removes the subscription if this is the currently active -// subscription. If the subscription was changed in the meantime, this function -// does nothing. -func (s *server) removeSubscriptionIfUnchanged(sb *subscription, err error) { - s.mtx.Lock() - // If the subscription reference hasn't changed yet in the meantime, kill it. - if s.subscription == sb { - if err == nil { - close(sb.done) - } else { - sb.err <- err - } - s.subscription = nil - } - s.mtx.Unlock() -} - // Listens to sendQueue for htlc_accepted requests from cln. The message will be // held until a subscriber is active, or the subscriber timeout expires. The // messages are sent to the grpc client in fifo order. @@ -274,7 +233,7 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { case <-ns: log.Printf("got a new subscriber. continue handleHtlcAccepted.") continue - case <-msg.timeout: + case <-time.After(time.Until(msg.timeout)): log.Printf( "WARNING: htlc with id '%s' timed out after '%v' waiting "+ "for grpc subscriber: %+v", @@ -345,6 +304,67 @@ func (s *server) listenHtlcResponses() { } } +// Helper function that blocks until a message from a grpc client is received +// or the server stops. Either returns a received message, or nil if the server +// has stopped. +func (s *server) recv() *proto.HtlcResolution { + for { + // make a copy of the used fields, to make sure state updates don't + // surprise us. The newSubscriber chan is swapped whenever a new + // subscriber arrives. + s.mtx.Lock() + sb := s.subscription + ns := s.newSubscriber + s.mtx.Unlock() + + if sb == nil { + log.Printf("Got no subscribers for receive. Waiting for subscriber.") + select { + case <-s.done: + log.Printf("Done signalled, stopping receive.") + return nil + case <-ns: + log.Printf("New subscription available for receive, continue receive.") + continue + } + } + + // There is a subscription active. Attempt to receive a message. + r, err := sb.stream.Recv() + if err == nil { + log.Printf("Received HtlcResolution %+v", r) + return r + } + + // Receiving the message failed, so the subscription is broken. Remove + // it if it hasn't been updated already. We'll try receiving again in + // the next iteration of the for loop. + log.Printf("Recv() errored, removing subscription: %v", err) + s.removeSubscriptionIfUnchanged(sb, err) + } +} + +// Stops and removes the subscription if this is the currently active +// subscription. If the subscription was changed in the meantime, this function +// does nothing. +func (s *server) removeSubscriptionIfUnchanged(sb *subscription, err error) { + s.mtx.Lock() + // If the subscription reference hasn't changed yet in the meantime, kill it. + if s.subscription == sb { + if err == nil { + log.Printf("Removing active subscription without error.") + close(sb.done) + } else { + log.Printf("Removing active subscription with error: %v", err) + sb.err <- err + } + s.subscription = nil + } else { + log.Printf("removeSubscriptionIfUnchanged: Subscription already removed.") + } + s.mtx.Unlock() +} + // Maps a grpc result to the corresponding result for cln. The cln message // is a raw json message, so it's easiest to use a map directly. func (s *server) mapResult(outcome interface{}) interface{} { diff --git a/itest/postgres.go b/itest/postgres.go index a8c4f54..5cb5047 100644 --- a/itest/postgres.go +++ b/itest/postgres.go @@ -41,6 +41,7 @@ func NewPostgresContainer(logfile string) (*PostgresContainer, error) { return &PostgresContainer{ password: "pgpassword", port: port, + logfile: logfile, }, nil } @@ -207,9 +208,9 @@ func (c *PostgresContainer) monitorLogs(ctx context.Context) { } defer i.Close() - file, err := os.OpenFile(c.logfile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) + file, err := os.Create(c.logfile) if err != nil { - log.Printf("Could not create container log file: %v", err) + log.Printf("Could not create container log file %v: %v", c.logfile, err) return } defer file.Close()