organize code and cleanup

This commit is contained in:
Jesse de Wit
2022-12-30 12:01:33 +01:00
parent 2bea61d8e7
commit 38dabe5685
3 changed files with 183 additions and 166 deletions

View File

@@ -60,43 +60,14 @@ func NewClnPlugin(in, out *os.File) *ClnPlugin {
// NOTE: The grpc server is started in the handleInit function. // NOTE: The grpc server is started in the handleInit function.
func (c *ClnPlugin) Start() { func (c *ClnPlugin) Start() {
c.setupLogging() c.setupLogging()
go c.listen() go c.listenRequests()
<-c.done <-c.done
} }
func (c *ClnPlugin) setupLogging() {
in, out := io.Pipe()
go func(in io.Reader) {
// everytime we get a new message, log it thru c-lightning
scanner := bufio.NewScanner(in)
for {
select {
case <-c.done:
return
default:
if !scanner.Scan() {
if err := scanner.Err(); err != nil {
log.Fatalf(
"can't print out to std err, killing: %v",
err,
)
}
}
for _, line := range strings.Split(scanner.Text(), "\n") {
c.log("info", line)
}
}
}
}(in)
log.SetFlags(log.Ltime | log.Lshortfile)
log.SetOutput(out)
}
// Stops the cln plugin. Drops any remaining work immediately. // Stops the cln plugin. Drops any remaining work immediately.
// Pending htlcs will be replayed when cln starts again. // Pending htlcs will be replayed when cln starts again.
func (c *ClnPlugin) Stop() { func (c *ClnPlugin) Stop() {
log.Printf("Stop called. Stopping plugin.")
close(c.done) close(c.done)
s := c.server s := c.server
@@ -107,7 +78,7 @@ func (c *ClnPlugin) Stop() {
// listens stdout for requests from cln and sends the requests to the // listens stdout for requests from cln and sends the requests to the
// appropriate handler in fifo order. // appropriate handler in fifo order.
func (c *ClnPlugin) listen() error { func (c *ClnPlugin) listenRequests() error {
scanner := bufio.NewScanner(c.in) scanner := bufio.NewScanner(c.in)
buf := make([]byte, 1024) buf := make([]byte, 1024)
scanner.Buffer(buf, MaxIntakeBuffer) scanner.Buffer(buf, MaxIntakeBuffer)
@@ -129,8 +100,10 @@ func (c *ClnPlugin) listen() error {
} }
msg := scanner.Bytes() msg := scanner.Bytes()
// TODO: Pipe logs to the proper place.
// Always log the message json
log.Println(string(msg)) log.Println(string(msg))
// pass down a copy so things stay sane // pass down a copy so things stay sane
msg_buf := make([]byte, len(msg)) msg_buf := make([]byte, len(msg))
copy(msg_buf, msg) copy(msg_buf, msg)
@@ -141,15 +114,39 @@ func (c *ClnPlugin) listen() error {
} }
} }
// Listens to responses to htlc_accepted requests from the grpc server.
func (c *ClnPlugin) listenServer() {
for {
select {
case <-c.done:
return
default:
id, result := c.server.Receive()
// The server may return nil if it is stopped.
if result == nil {
continue
}
serid, _ := json.Marshal(&id)
c.sendToCln(&Response{
Id: serid,
JsonRpc: SpecVersion,
Result: result,
})
}
}
}
// processes a single message from cln. Sends the message to the appropriate // processes a single message from cln. Sends the message to the appropriate
// handler. // handler.
func (c *ClnPlugin) processMsg(msg []byte) { func (c *ClnPlugin) processMsg(msg []byte) {
if len(msg) == 0 { if len(msg) == 0 {
c.sendError(nil, InvalidRequest, "Invalid Request") c.sendError(nil, InvalidRequest, "Got an invalid zero length request")
return return
} }
// Right now we don't handle arrays of requests... // Handle request batches.
if msg[0] == '[' { if msg[0] == '[' {
var requests []*Request var requests []*Request
err := json.Unmarshal(msg, &requests) err := json.Unmarshal(msg, &requests)
@@ -157,7 +154,7 @@ func (c *ClnPlugin) processMsg(msg []byte) {
c.sendError( c.sendError(
nil, nil,
ParseError, ParseError,
fmt.Sprintf("Parse error:%s [%s]", err.Error(), msg), fmt.Sprintf("Failed to unmarshal request batch: %v", err),
) )
return return
} }
@@ -173,11 +170,10 @@ func (c *ClnPlugin) processMsg(msg []byte) {
var request Request var request Request
err := json.Unmarshal(msg, &request) err := json.Unmarshal(msg, &request)
if err != nil { if err != nil {
log.Printf("failed to unmarshal request: %v", err)
c.sendError( c.sendError(
nil, nil,
ParseError, ParseError,
fmt.Sprintf("Parse error:%s [%s]", err.Error(), msg), fmt.Sprintf("failed to unmarshal request: %v", err),
) )
return return
} }
@@ -188,15 +184,11 @@ func (c *ClnPlugin) processMsg(msg []byte) {
func (c *ClnPlugin) processRequest(request *Request) { func (c *ClnPlugin) processRequest(request *Request) {
// Make sure the spec version is expected. // Make sure the spec version is expected.
if request.JsonRpc != SpecVersion { if request.JsonRpc != SpecVersion {
c.sendError( c.sendError(request.Id, InvalidRequest, fmt.Sprintf(
request.Id,
InvalidRequest,
fmt.Sprintf(
`Invalid jsonrpc, expected '%s' got '%s'`, `Invalid jsonrpc, expected '%s' got '%s'`,
SpecVersion, SpecVersion,
request.JsonRpc, request.JsonRpc,
), ))
)
return return
} }
@@ -235,9 +227,8 @@ func (c *ClnPlugin) handleGetManifest(request *Request) {
{ {
Name: SubscriberTimeoutOption, Name: SubscriberTimeoutOption,
Type: "string", Type: "string",
Description: "htlc timeout duration when there is no " + Description: "the maximum duration we will hold a htlc " +
"subscriber to the grpc server. golang duration " + "if no subscriber is active. golang duration string.",
"string.",
Default: &DefaultSubscriberTimeout, Default: &DefaultSubscriberTimeout,
}, },
}, },
@@ -266,11 +257,7 @@ func (c *ClnPlugin) handleInit(request *Request) {
c.sendError( c.sendError(
request.Id, request.Id,
ParseError, ParseError,
fmt.Sprintf( fmt.Sprintf("Failed to unmarshal init params: %v", err),
"Error parsing init params:%s [%s]",
err.Error(),
request.Params,
),
) )
return return
} }
@@ -362,30 +349,6 @@ func (c *ClnPlugin) handleInit(request *Request) {
}) })
} }
// Listens to responses to htlc_accepted requests from the grpc server.
func (c *ClnPlugin) listenServer() {
for {
select {
case <-c.done:
return
default:
id, result := c.server.Receive()
// The server may return nil if it is stopped.
if result == nil {
continue
}
serid, _ := json.Marshal(&id)
c.sendToCln(&Response{
Id: serid,
JsonRpc: SpecVersion,
Result: result,
})
}
}
}
// Handles the shutdown message. Stops any work immediately. // Handles the shutdown message. Stops any work immediately.
func (c *ClnPlugin) handleShutdown(request *Request) { func (c *ClnPlugin) handleShutdown(request *Request) {
c.Stop() c.Stop()
@@ -400,7 +363,7 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) {
request.Id, request.Id,
ParseError, ParseError,
fmt.Sprintf( fmt.Sprintf(
"Error parsing htlc_accepted params:%s [%s]", "Failed to unmarshal htlc_accepted params:%s [%s]",
err.Error(), err.Error(),
request.Params, request.Params,
), ),
@@ -408,11 +371,15 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) {
return return
} }
c.server.Send(c.idToString(request.Id), &htlc) c.server.Send(idToString(request.Id), &htlc)
} }
// Sends an error to cln. // Sends an error to cln.
func (c *ClnPlugin) sendError(id json.RawMessage, code int, message string) { func (c *ClnPlugin) sendError(id json.RawMessage, code int, message string) {
// Log the error to cln first.
c.log("error", message)
// Then create an error message.
resp := &Response{ resp := &Response{
JsonRpc: SpecVersion, JsonRpc: SpecVersion,
Error: &RpcError{ Error: &RpcError{
@@ -428,29 +395,14 @@ func (c *ClnPlugin) sendError(id json.RawMessage, code int, message string) {
c.sendToCln(resp) c.sendToCln(resp)
} }
// converts a raw cln id to string. The CLN id can either be an integer or a
// string. if it's a string, the quotes are removed.
func (c *ClnPlugin) idToString(id json.RawMessage) string {
if len(id) == 0 {
return ""
}
str := string(id)
str = strings.TrimSpace(str)
str = strings.Trim(str, "\"")
str = strings.Trim(str, "'")
return str
}
// Sends a message to cln. // Sends a message to cln.
func (c *ClnPlugin) sendToCln(msg interface{}) { func (c *ClnPlugin) sendToCln(msg interface{}) {
c.writeMtx.Lock() c.writeMtx.Lock()
defer c.writeMtx.Unlock() defer c.writeMtx.Unlock()
// TODO: log
data, err := json.Marshal(msg) data, err := json.Marshal(msg)
if err != nil { if err != nil {
log.Println(err.Error()) log.Printf("Failed to marshal message for cln, ignoring message: %+v", msg)
return return
} }
@@ -459,6 +411,36 @@ func (c *ClnPlugin) sendToCln(msg interface{}) {
c.out.Flush() c.out.Flush()
} }
func (c *ClnPlugin) setupLogging() {
in, out := io.Pipe()
log.SetFlags(log.Ltime | log.Lshortfile)
log.SetOutput(out)
go func(in io.Reader) {
// everytime we get a new message, log it thru c-lightning
scanner := bufio.NewScanner(in)
for {
select {
case <-c.done:
return
default:
if !scanner.Scan() {
if err := scanner.Err(); err != nil {
log.Fatalf(
"can't print out to std err, killing: %v",
err,
)
}
}
for _, line := range strings.Split(scanner.Text(), "\n") {
c.log("info", line)
}
}
}
}(in)
}
func (c *ClnPlugin) log(level string, message string) { func (c *ClnPlugin) log(level string, message string) {
params, _ := json.Marshal(&LogNotification{ params, _ := json.Marshal(&LogNotification{
Level: level, Level: level,
@@ -486,3 +468,17 @@ func scanDoubleNewline(
// the buffer if we're at EOF, with no /n/n present // the buffer if we're at EOF, with no /n/n present
return 0, nil, nil return 0, nil, nil
} }
// converts a raw cln id to string. The CLN id can either be an integer or a
// string. if it's a string, the quotes are removed.
func idToString(id json.RawMessage) string {
if len(id) == 0 {
return ""
}
str := string(id)
str = strings.TrimSpace(str)
str = strings.Trim(str, "\"")
str = strings.Trim(str, "'")
return str
}

View File

@@ -23,7 +23,7 @@ type subscription struct {
type htlcAcceptedMsg struct { type htlcAcceptedMsg struct {
id string id string
htlc *HtlcAccepted htlc *HtlcAccepted
timeout <-chan time.Time timeout time.Time
} }
// Internal htlc result message meant for the recvQueue. // Internal htlc result message meant for the recvQueue.
@@ -53,7 +53,11 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
return &server{ return &server{
listenAddress: listenAddress, listenAddress: listenAddress,
subscriberTimeout: subscriberTimeout, subscriberTimeout: subscriberTimeout,
// The send queue exists to buffer messages until a subscriber is active.
sendQueue: make(chan *htlcAcceptedMsg, 10000), sendQueue: make(chan *htlcAcceptedMsg, 10000),
// The receive queue exists mainly to allow returning timeouts to the
// cln plugin. If there is no subscriber active within the subscriber
// timeout period these results can be put directly on the receive queue.
recvQueue: make(chan *htlcResultMsg, 10000), recvQueue: make(chan *htlcResultMsg, 10000),
started: make(chan struct{}), started: make(chan struct{}),
startError: make(chan error, 1), startError: make(chan error, 1),
@@ -113,16 +117,20 @@ func (s *server) Stop() {
close(s.done) close(s.done)
s.grpcServer.Stop() s.grpcServer.Stop()
s.grpcServer = nil s.grpcServer = nil
log.Printf("Server stopped.")
} }
// Grpc method that is called when a new client subscribes. There can only be // Grpc method that is called when a new client subscribes. There can only be
// one subscriber active at a time. If there is an error receiving or sending // one subscriber active at a time. If there is an error receiving or sending
// from or to the subscriber, the subscription is closed. // from or to the subscriber, the subscription is closed.
func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
log.Printf("Got HTLC stream subscription request.")
s.mtx.Lock() s.mtx.Lock()
if s.subscription != nil { if s.subscription == nil {
log.Printf("Got a new HTLC stream subscription request.")
} else {
s.mtx.Unlock() s.mtx.Unlock()
log.Printf("Got a HTLC stream subscription request, but subscription " +
"was already active.")
return fmt.Errorf("already subscribed") return fmt.Errorf("already subscribed")
} }
@@ -140,10 +148,17 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
s.mtx.Unlock() s.mtx.Unlock()
defer func() { defer func() {
// When the HtlcStream function returns, that means the subscriber will
// be gone. Cleanup the subscription so we'll be ready to accept a new
// one later.
s.removeSubscriptionIfUnchanged(sb, nil) s.removeSubscriptionIfUnchanged(sb, nil)
}() }()
go func() { go func() {
// If the context is done, there will be no more connection with the
// client. Listen for context done and clean up the subscriber.
// Cleaning up the subscriber will make the HtlcStream function exit.
// (sb.done or sb.err)
<-stream.Context().Done() <-stream.Context().Done()
log.Printf("HtlcStream context is done. Removing subscriber: %v", stream.Context().Err()) log.Printf("HtlcStream context is done. Removing subscriber: %v", stream.Context().Err())
s.removeSubscriptionIfUnchanged(sb, stream.Context().Err()) s.removeSubscriptionIfUnchanged(sb, stream.Context().Err())
@@ -167,13 +182,14 @@ func (s *server) Send(id string, h *HtlcAccepted) {
s.sendQueue <- &htlcAcceptedMsg{ s.sendQueue <- &htlcAcceptedMsg{
id: id, id: id,
htlc: h, htlc: h,
timeout: time.After(s.subscriberTimeout), timeout: time.Now().Add(s.subscriberTimeout),
} }
} }
// Receives the next htlc resolution message from the grpc client. Returns id // Receives the next htlc resolution message from the grpc client. Returns id
// and message. Blocks until a message is available. Returns a nil message if // and message. Blocks until a message is available. Returns a nil message if
// the server is done. // the server is done. This function effectively waits until a subscriber is
// active and has sent a message.
func (s *server) Receive() (string, interface{}) { func (s *server) Receive() (string, interface{}) {
select { select {
case <-s.done: case <-s.done:
@@ -183,63 +199,6 @@ func (s *server) Receive() (string, interface{}) {
} }
} }
// Helper function that blocks until a message from a grpc client is received
// or the server stops. Either returns a received message, or nil if the server
// has stopped.
func (s *server) recv() *proto.HtlcResolution {
for {
// make a copy of the used fields, to make sure state updates don't
// surprise us. The newSubscriber chan is swapped whenever a new
// subscriber arrives.
s.mtx.Lock()
sb := s.subscription
ns := s.newSubscriber
s.mtx.Unlock()
if sb == nil {
log.Printf("Got no subscribers for receive. Waiting for subscriber.")
select {
case <-s.done:
log.Printf("Done signalled, stopping receive.")
return nil
case <-ns:
log.Printf("New subscription available for receive, continue receive.")
continue
}
}
// There is a subscription active. Attempt to receive a message.
r, err := sb.stream.Recv()
if err == nil {
log.Printf("Received HtlcResolution %+v", r)
return r
}
// Receiving the message failed, so the subscription is broken. Remove
// it if it hasn't been updated already. We'll try receiving again in
// the next iteration of the for loop.
log.Printf("Recv() errored, removing subscription: %v", err)
s.removeSubscriptionIfUnchanged(sb, err)
}
}
// Stops and removes the subscription if this is the currently active
// subscription. If the subscription was changed in the meantime, this function
// does nothing.
func (s *server) removeSubscriptionIfUnchanged(sb *subscription, err error) {
s.mtx.Lock()
// If the subscription reference hasn't changed yet in the meantime, kill it.
if s.subscription == sb {
if err == nil {
close(sb.done)
} else {
sb.err <- err
}
s.subscription = nil
}
s.mtx.Unlock()
}
// Listens to sendQueue for htlc_accepted requests from cln. The message will be // Listens to sendQueue for htlc_accepted requests from cln. The message will be
// held until a subscriber is active, or the subscriber timeout expires. The // held until a subscriber is active, or the subscriber timeout expires. The
// messages are sent to the grpc client in fifo order. // messages are sent to the grpc client in fifo order.
@@ -274,7 +233,7 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
case <-ns: case <-ns:
log.Printf("got a new subscriber. continue handleHtlcAccepted.") log.Printf("got a new subscriber. continue handleHtlcAccepted.")
continue continue
case <-msg.timeout: case <-time.After(time.Until(msg.timeout)):
log.Printf( log.Printf(
"WARNING: htlc with id '%s' timed out after '%v' waiting "+ "WARNING: htlc with id '%s' timed out after '%v' waiting "+
"for grpc subscriber: %+v", "for grpc subscriber: %+v",
@@ -345,6 +304,67 @@ func (s *server) listenHtlcResponses() {
} }
} }
// Helper function that blocks until a message from a grpc client is received
// or the server stops. Either returns a received message, or nil if the server
// has stopped.
func (s *server) recv() *proto.HtlcResolution {
for {
// make a copy of the used fields, to make sure state updates don't
// surprise us. The newSubscriber chan is swapped whenever a new
// subscriber arrives.
s.mtx.Lock()
sb := s.subscription
ns := s.newSubscriber
s.mtx.Unlock()
if sb == nil {
log.Printf("Got no subscribers for receive. Waiting for subscriber.")
select {
case <-s.done:
log.Printf("Done signalled, stopping receive.")
return nil
case <-ns:
log.Printf("New subscription available for receive, continue receive.")
continue
}
}
// There is a subscription active. Attempt to receive a message.
r, err := sb.stream.Recv()
if err == nil {
log.Printf("Received HtlcResolution %+v", r)
return r
}
// Receiving the message failed, so the subscription is broken. Remove
// it if it hasn't been updated already. We'll try receiving again in
// the next iteration of the for loop.
log.Printf("Recv() errored, removing subscription: %v", err)
s.removeSubscriptionIfUnchanged(sb, err)
}
}
// Stops and removes the subscription if this is the currently active
// subscription. If the subscription was changed in the meantime, this function
// does nothing.
func (s *server) removeSubscriptionIfUnchanged(sb *subscription, err error) {
s.mtx.Lock()
// If the subscription reference hasn't changed yet in the meantime, kill it.
if s.subscription == sb {
if err == nil {
log.Printf("Removing active subscription without error.")
close(sb.done)
} else {
log.Printf("Removing active subscription with error: %v", err)
sb.err <- err
}
s.subscription = nil
} else {
log.Printf("removeSubscriptionIfUnchanged: Subscription already removed.")
}
s.mtx.Unlock()
}
// Maps a grpc result to the corresponding result for cln. The cln message // Maps a grpc result to the corresponding result for cln. The cln message
// is a raw json message, so it's easiest to use a map directly. // is a raw json message, so it's easiest to use a map directly.
func (s *server) mapResult(outcome interface{}) interface{} { func (s *server) mapResult(outcome interface{}) interface{} {

View File

@@ -41,6 +41,7 @@ func NewPostgresContainer(logfile string) (*PostgresContainer, error) {
return &PostgresContainer{ return &PostgresContainer{
password: "pgpassword", password: "pgpassword",
port: port, port: port,
logfile: logfile,
}, nil }, nil
} }
@@ -207,9 +208,9 @@ func (c *PostgresContainer) monitorLogs(ctx context.Context) {
} }
defer i.Close() defer i.Close()
file, err := os.OpenFile(c.logfile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) file, err := os.Create(c.logfile)
if err != nil { if err != nil {
log.Printf("Could not create container log file: %v", err) log.Printf("Could not create container log file %v: %v", c.logfile, err)
return return
} }
defer file.Close() defer file.Close()