mirror of
https://github.com/aljazceru/lspd.git
synced 2025-12-22 16:24:20 +01:00
use internal cln id as correlation id
This commit is contained in:
@@ -8,64 +8,103 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/breez/lspd/cln_plugin/proto"
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var receiveWaitDelay = time.Millisecond * 200
|
||||
|
||||
// A subscription represents a grpc client that is connected to the server.
|
||||
type subscription struct {
|
||||
stream ClnPlugin_HtlcStreamServer
|
||||
stream proto.ClnPlugin_HtlcStreamServer
|
||||
done chan struct{}
|
||||
}
|
||||
type server struct {
|
||||
ClnPluginServer
|
||||
listenAddress string
|
||||
grpcServer *grpc.Server
|
||||
startMtx sync.Mutex
|
||||
corrMtx sync.Mutex
|
||||
subscription *subscription
|
||||
newSubscriber chan struct{}
|
||||
done chan struct{}
|
||||
correlations map[uint64]chan *HtlcResolution
|
||||
index uint64
|
||||
err chan error
|
||||
}
|
||||
|
||||
func NewServer(listenAddress string) *server {
|
||||
// Internal htlc_accepted message meant for the sendQueue.
|
||||
type htlcAcceptedMsg struct {
|
||||
id string
|
||||
htlc *HtlcAccepted
|
||||
timeout <-chan time.Time
|
||||
}
|
||||
|
||||
// Internal htlc result message meant for the recvQueue.
|
||||
type htlcResultMsg struct {
|
||||
id string
|
||||
result interface{}
|
||||
}
|
||||
|
||||
type server struct {
|
||||
proto.ClnPluginServer
|
||||
listenAddress string
|
||||
subscriberTimeout time.Duration
|
||||
grpcServer *grpc.Server
|
||||
mtx sync.Mutex
|
||||
subscription *subscription
|
||||
newSubscriber chan struct{}
|
||||
started chan struct{}
|
||||
startError chan error
|
||||
done chan struct{}
|
||||
sendQueue chan *htlcAcceptedMsg
|
||||
recvQueue chan *htlcResultMsg
|
||||
}
|
||||
|
||||
// Creates a new grpc server
|
||||
func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
|
||||
// TODO: Set a sane max queue size
|
||||
return &server{
|
||||
listenAddress: listenAddress,
|
||||
newSubscriber: make(chan struct{}, 1),
|
||||
correlations: make(map[uint64]chan *HtlcResolution),
|
||||
index: 0,
|
||||
listenAddress: listenAddress,
|
||||
subscriberTimeout: subscriberTimeout,
|
||||
sendQueue: make(chan *htlcAcceptedMsg, 10000),
|
||||
recvQueue: make(chan *htlcResultMsg, 10000),
|
||||
started: make(chan struct{}),
|
||||
startError: make(chan error, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// Starts the grpc server. Blocks until the servver is stopped. WaitStarted can
|
||||
// be called to ensure the server is started without errors if this function
|
||||
// is run as a goroutine.
|
||||
func (s *server) Start() error {
|
||||
s.startMtx.Lock()
|
||||
s.mtx.Lock()
|
||||
if s.grpcServer != nil {
|
||||
s.startMtx.Unlock()
|
||||
s.mtx.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", s.listenAddress)
|
||||
if err != nil {
|
||||
log.Printf("ERROR Server failed to listen: %v", err)
|
||||
s.startMtx.Unlock()
|
||||
s.startError <- err
|
||||
s.mtx.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
s.done = make(chan struct{})
|
||||
s.newSubscriber = make(chan struct{})
|
||||
s.grpcServer = grpc.NewServer()
|
||||
s.startMtx.Unlock()
|
||||
RegisterClnPluginServer(s.grpcServer, s)
|
||||
s.mtx.Unlock()
|
||||
proto.RegisterClnPluginServer(s.grpcServer, s)
|
||||
|
||||
log.Printf("Server starting to listen on %s.", s.listenAddress)
|
||||
go s.listenHtlcRequests()
|
||||
go s.listenHtlcResponses()
|
||||
close(s.started)
|
||||
return s.grpcServer.Serve(lis)
|
||||
}
|
||||
|
||||
// Waits until the server has started, or errored during startup.
|
||||
func (s *server) WaitStarted() error {
|
||||
select {
|
||||
case <-s.started:
|
||||
return nil
|
||||
case err := <-s.startError:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Stops all work from the grpc server immediately.
|
||||
func (s *server) Stop() {
|
||||
s.startMtx.Lock()
|
||||
defer s.startMtx.Unlock()
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
log.Printf("Server Stop() called.")
|
||||
if s.grpcServer == nil {
|
||||
return
|
||||
@@ -76,36 +115,38 @@ func (s *server) Stop() {
|
||||
s.grpcServer = nil
|
||||
}
|
||||
|
||||
func (s *server) HtlcStream(stream ClnPlugin_HtlcStreamServer) error {
|
||||
// Grpc method that is called when a new client subscribes. There can only be
|
||||
// one subscriber active at a time. If there is an error receiving or sending
|
||||
// from or to the subscriber, the subscription is closed.
|
||||
func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
|
||||
log.Printf("Got HTLC stream subscription request.")
|
||||
s.startMtx.Lock()
|
||||
s.mtx.Lock()
|
||||
if s.subscription != nil {
|
||||
s.startMtx.Unlock()
|
||||
s.mtx.Unlock()
|
||||
return fmt.Errorf("already subscribed")
|
||||
}
|
||||
|
||||
sb := &subscription{
|
||||
stream: stream,
|
||||
done: make(chan struct{}),
|
||||
err: make(chan error, 1),
|
||||
}
|
||||
s.subscription = sb
|
||||
s.newSubscriber <- struct{}{}
|
||||
s.startMtx.Unlock()
|
||||
|
||||
// Notify listeners that a new subscriber is active. Replace the chan with
|
||||
// a new one immediately in case this subscriber is dropped later.
|
||||
close(s.newSubscriber)
|
||||
s.newSubscriber = make(chan struct{})
|
||||
s.mtx.Unlock()
|
||||
|
||||
defer func() {
|
||||
s.startMtx.Lock()
|
||||
s.subscription = nil
|
||||
close(sb.done)
|
||||
s.startMtx.Unlock()
|
||||
s.removeSubscriptionIfUnchanged(sb, nil)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
<-stream.Context().Done()
|
||||
log.Printf("HtlcStream context is done. Removing subscriber: %v", stream.Context().Err())
|
||||
s.startMtx.Lock()
|
||||
s.subscription = nil
|
||||
close(sb.done)
|
||||
s.startMtx.Unlock()
|
||||
s.removeSubscriptionIfUnchanged(sb, stream.Context().Err())
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -115,82 +156,179 @@ func (s *server) HtlcStream(stream ClnPlugin_HtlcStreamServer) error {
|
||||
case <-sb.done:
|
||||
log.Printf("HTLC stream signalled done. Return EOF.")
|
||||
return io.EOF
|
||||
case err := <-sb.err:
|
||||
log.Printf("HTLC stream signalled error. Return %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) Send(h *HtlcAccepted) *HtlcResolution {
|
||||
sb := s.subscription
|
||||
if sb == nil {
|
||||
log.Printf("No subscribers available. Ignoring HtlcAccepted %+v", h)
|
||||
return s.defaultResolution()
|
||||
}
|
||||
|
||||
c := make(chan *HtlcResolution)
|
||||
s.corrMtx.Lock()
|
||||
s.index++
|
||||
index := s.index
|
||||
s.correlations[index] = c
|
||||
s.corrMtx.Unlock()
|
||||
|
||||
h.Correlationid = index
|
||||
|
||||
defer func() {
|
||||
s.corrMtx.Lock()
|
||||
delete(s.correlations, index)
|
||||
s.corrMtx.Unlock()
|
||||
close(c)
|
||||
}()
|
||||
|
||||
log.Printf("Sending HtlcAccepted: %+v", h)
|
||||
err := sb.stream.Send(h)
|
||||
if err != nil {
|
||||
// TODO: Close the connection? Reset the subscriber?
|
||||
log.Printf("Send() errored, Correlationid: %d: %v", index, err)
|
||||
return s.defaultResolution()
|
||||
// Enqueues a htlc_accepted message for send to the grpc client.
|
||||
func (s *server) Send(id string, h *HtlcAccepted) {
|
||||
s.sendQueue <- &htlcAcceptedMsg{
|
||||
id: id,
|
||||
htlc: h,
|
||||
timeout: time.After(s.subscriberTimeout),
|
||||
}
|
||||
}
|
||||
|
||||
// Receives the next htlc resolution message from the grpc client. Returns id
|
||||
// and message. Blocks until a message is available. Returns a nil message if
|
||||
// the server is done.
|
||||
func (s *server) Receive() (string, interface{}) {
|
||||
select {
|
||||
case <-s.done:
|
||||
log.Printf("Signalled done while waiting for htlc resolution, Correlationid: %d, Ignoring: %+v", index, h)
|
||||
return s.defaultResolution()
|
||||
case resolution := <-c:
|
||||
log.Printf("Got resolution, Correlationid: %d: %+v", index, h)
|
||||
return resolution
|
||||
return "", nil
|
||||
case msg := <-s.recvQueue:
|
||||
return msg.id, msg.result
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) recv() *HtlcResolution {
|
||||
// Helper function that blocks until a message from a grpc client is received
|
||||
// or the server stops. Either returns a received message, or nil if the server
|
||||
// has stopped.
|
||||
func (s *server) recv() *proto.HtlcResolution {
|
||||
for {
|
||||
// make a copy of the used fields, to make sure state updates don't
|
||||
// surprise us. The newSubscriber chan is swapped whenever a new
|
||||
// subscriber arrives.
|
||||
s.mtx.Lock()
|
||||
sb := s.subscription
|
||||
ns := s.newSubscriber
|
||||
s.mtx.Unlock()
|
||||
|
||||
if sb == nil {
|
||||
log.Printf("Got no subscribers for receive. Waiting for subscriber.")
|
||||
select {
|
||||
case <-s.done:
|
||||
log.Printf("Done signalled, stopping receive.")
|
||||
return s.defaultResolution()
|
||||
case <-s.newSubscriber:
|
||||
return nil
|
||||
case <-ns:
|
||||
log.Printf("New subscription available for receive, continue receive.")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// There is a subscription active. Attempt to receive a message.
|
||||
r, err := sb.stream.Recv()
|
||||
if err == nil {
|
||||
log.Printf("Received HtlcResolution %+v", r)
|
||||
return r
|
||||
}
|
||||
|
||||
// TODO: close the subscription??
|
||||
log.Printf("Recv() errored, waiting %v: %v", receiveWaitDelay, err)
|
||||
// Receiving the message failed, so the subscription is broken. Remove
|
||||
// it if it hasn't been updated already. We'll try receiving again in
|
||||
// the next iteration of the for loop.
|
||||
log.Printf("Recv() errored, removing subscription: %v", err)
|
||||
s.removeSubscriptionIfUnchanged(sb, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Stops and removes the subscription if this is the currently active
|
||||
// subscription. If the subscription was changed in the meantime, this function
|
||||
// does nothing.
|
||||
func (s *server) removeSubscriptionIfUnchanged(sb *subscription, err error) {
|
||||
s.mtx.Lock()
|
||||
// If the subscription reference hasn't changed yet in the meantime, kill it.
|
||||
if s.subscription == sb {
|
||||
if err == nil {
|
||||
close(sb.done)
|
||||
} else {
|
||||
sb.err <- err
|
||||
}
|
||||
s.subscription = nil
|
||||
}
|
||||
s.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Listens to sendQueue for htlc_accepted requests from cln. The message will be
|
||||
// held until a subscriber is active, or the subscriber timeout expires. The
|
||||
// messages are sent to the grpc client in fifo order.
|
||||
func (s *server) listenHtlcRequests() {
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
log.Printf("Done signalled, stopping receive.")
|
||||
return s.defaultResolution()
|
||||
case <-time.After(receiveWaitDelay):
|
||||
log.Printf("listenHtlcRequests received done. Stop listening.")
|
||||
return
|
||||
case msg := <-s.sendQueue:
|
||||
s.handleHtlcAccepted(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to send a htlc_accepted message to the grpc client. The message will
|
||||
// be held until a subscriber is active, or the subscriber timeout expires.
|
||||
func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
|
||||
for {
|
||||
s.mtx.Lock()
|
||||
sb := s.subscription
|
||||
ns := s.newSubscriber
|
||||
s.mtx.Unlock()
|
||||
|
||||
// If there is no active subscription, wait until there is a new
|
||||
// subscriber, or the message times out.
|
||||
if sb == nil {
|
||||
select {
|
||||
case <-s.done:
|
||||
log.Printf("handleHtlcAccepted received server done. Stop processing.")
|
||||
return
|
||||
case <-ns:
|
||||
log.Printf("got a new subscriber. continue handleHtlcAccepted.")
|
||||
continue
|
||||
case <-msg.timeout:
|
||||
log.Printf(
|
||||
"WARNING: htlc with id '%s' timed out after '%v' waiting "+
|
||||
"for grpc subscriber: %+v",
|
||||
msg.id,
|
||||
s.subscriberTimeout,
|
||||
msg.htlc,
|
||||
)
|
||||
s.recvQueue <- &htlcResultMsg{
|
||||
id: msg.id,
|
||||
result: s.defaultResult(),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// There is a subscriber. Attempt to send the htlc_accepted message.
|
||||
err := sb.stream.Send(&proto.HtlcAccepted{
|
||||
Correlationid: msg.id,
|
||||
Onion: &proto.Onion{
|
||||
Payload: msg.htlc.Onion.Payload,
|
||||
ShortChannelId: msg.htlc.Onion.ShortChannelId,
|
||||
ForwardMsat: msg.htlc.Onion.ForwardMsat,
|
||||
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
|
||||
SharedSecret: msg.htlc.Onion.SharedSecret,
|
||||
NextOnion: msg.htlc.Onion.NextOnion,
|
||||
},
|
||||
Htlc: &proto.Htlc{
|
||||
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
|
||||
Id: msg.htlc.Htlc.Id,
|
||||
AmountMsat: msg.htlc.Htlc.AmountMsat,
|
||||
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
|
||||
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
|
||||
PaymentHash: msg.htlc.Htlc.PaymentHash,
|
||||
},
|
||||
ForwardTo: msg.htlc.ForwardTo,
|
||||
})
|
||||
|
||||
// If there is no error, we're done.
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If we end up here, there was an error sending the message to the
|
||||
// grpc client.
|
||||
log.Printf("Error sending htlc_accepted message to subscriber. "+
|
||||
"Removing subscription: %v", err)
|
||||
s.removeSubscriptionIfUnchanged(sb, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Listens to htlc responses from the grpc client and appends them to the
|
||||
// receive queue. The messages from the receive queue are read in the Receive
|
||||
// function.
|
||||
func (s *server) listenHtlcResponses() {
|
||||
for {
|
||||
select {
|
||||
@@ -198,23 +336,77 @@ func (s *server) listenHtlcResponses() {
|
||||
log.Printf("listenHtlcResponses received done. Stopping listening.")
|
||||
return
|
||||
default:
|
||||
response := s.recv()
|
||||
s.corrMtx.Lock()
|
||||
correlation, ok := s.correlations[response.Correlationid]
|
||||
s.corrMtx.Unlock()
|
||||
if ok {
|
||||
correlation <- response
|
||||
} else {
|
||||
log.Printf("Got HTLC resolution that could not be correlated: %+v", response)
|
||||
resp := s.recv()
|
||||
s.recvQueue <- &htlcResultMsg{
|
||||
id: resp.Correlationid,
|
||||
result: s.mapResult(resp.Outcome),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) defaultResolution() *HtlcResolution {
|
||||
return &HtlcResolution{
|
||||
Outcome: &HtlcResolution_Continue{
|
||||
Continue: &HtlcContinue{},
|
||||
},
|
||||
// Maps a grpc result to the corresponding result for cln. The cln message
|
||||
// is a raw json message, so it's easiest to use a map directly.
|
||||
func (s *server) mapResult(outcome interface{}) interface{} {
|
||||
// result: continue
|
||||
cont, ok := outcome.(*proto.HtlcResolution_Continue)
|
||||
if ok {
|
||||
result := map[string]interface{}{
|
||||
"result": "continue",
|
||||
}
|
||||
|
||||
if cont.Continue.ForwardTo != nil {
|
||||
result["forward_to"] = *cont.Continue.ForwardTo
|
||||
}
|
||||
|
||||
if cont.Continue.Payload != nil {
|
||||
result["payload"] = *cont.Continue.Payload
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// result: fail
|
||||
fail, ok := outcome.(*proto.HtlcResolution_Fail)
|
||||
if ok {
|
||||
result := map[string]interface{}{
|
||||
"result": "fail",
|
||||
}
|
||||
|
||||
fm, ok := fail.Fail.Failure.(*proto.HtlcFail_FailureMessage)
|
||||
if ok {
|
||||
result["failure_message"] = fm.FailureMessage
|
||||
}
|
||||
|
||||
fo, ok := fail.Fail.Failure.(*proto.HtlcFail_FailureOnion)
|
||||
if ok {
|
||||
result["failure_onion"] = fo.FailureOnion
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// result: resolve
|
||||
resolve, ok := outcome.(*proto.HtlcResolution_Resolve)
|
||||
if ok {
|
||||
result := map[string]interface{}{
|
||||
"result": "resolve",
|
||||
"payment_key": resolve.Resolve.PaymentKey,
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// On an unknown result we haven't implemented all possible cases from the
|
||||
// grpc message. We don't understand what's going on, so we'll return
|
||||
// result: continue.
|
||||
log.Printf("Unexpected htlc resolution type %T: %+v", outcome, outcome)
|
||||
return s.defaultResult()
|
||||
}
|
||||
|
||||
// Returns a result: continue message.
|
||||
func (s *server) defaultResult() interface{} {
|
||||
return map[string]interface{}{
|
||||
"result": "continue",
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user