cln_plugin: rename htlc related fields/functions

This commit is contained in:
Jesse de Wit
2023-08-10 08:50:47 +02:00
parent 8ff4a61f06
commit 5c6f7da265
2 changed files with 36 additions and 34 deletions

View File

@@ -31,14 +31,14 @@ type server struct {
subscriberTimeout time.Duration
grpcServer *grpc.Server
mtx sync.Mutex
stream proto.ClnPlugin_HtlcStreamServer
newSubscriber chan struct{}
started chan struct{}
done chan struct{}
completed chan struct{}
startError chan error
sendQueue chan *htlcAcceptedMsg
recvQueue chan *htlcResultMsg
htlcnewSubscriber chan struct{}
htlcStream proto.ClnPlugin_HtlcStreamServer
htlcSendQueue chan *htlcAcceptedMsg
htlcRecvQueue chan *htlcResultMsg
}
// Creates a new grpc server
@@ -48,13 +48,13 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
listenAddress: listenAddress,
subscriberTimeout: subscriberTimeout,
// The send queue exists to buffer messages until a subscriber is active.
sendQueue: make(chan *htlcAcceptedMsg, 10000),
htlcSendQueue: make(chan *htlcAcceptedMsg, 10000),
// The receive queue exists mainly to allow returning timeouts to the
// cln plugin. If there is no subscriber active within the subscriber
// timeout period these results can be put directly on the receive queue.
recvQueue: make(chan *htlcResultMsg, 10000),
started: make(chan struct{}),
startError: make(chan error, 1),
htlcRecvQueue: make(chan *htlcResultMsg, 10000),
started: make(chan struct{}),
startError: make(chan error, 1),
}
}
@@ -78,7 +78,7 @@ func (s *server) Start() error {
s.done = make(chan struct{})
s.completed = make(chan struct{})
s.newSubscriber = make(chan struct{})
s.htlcnewSubscriber = make(chan struct{})
s.grpcServer = grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: time.Duration(1) * time.Second,
@@ -132,7 +132,7 @@ func (s *server) Stop() {
// from or to the subscriber, the subscription is closed.
func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
s.mtx.Lock()
if s.stream == nil {
if s.htlcStream == nil {
log.Printf("Got a new HTLC stream subscription request.")
} else {
s.mtx.Unlock()
@@ -141,12 +141,12 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
return fmt.Errorf("already subscribed")
}
s.stream = stream
s.htlcStream = stream
// Notify listeners that a new subscriber is active. Replace the chan with
// a new one immediately in case this subscriber is dropped later.
close(s.newSubscriber)
s.newSubscriber = make(chan struct{})
close(s.htlcnewSubscriber)
s.htlcnewSubscriber = make(chan struct{})
s.mtx.Unlock()
<-stream.Context().Done()
@@ -154,15 +154,15 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
// Remove the subscriber.
s.mtx.Lock()
s.stream = nil
s.htlcStream = nil
s.mtx.Unlock()
return stream.Context().Err()
}
// Enqueues a htlc_accepted message for send to the grpc client.
func (s *server) Send(id string, h *HtlcAccepted) {
s.sendQueue <- &htlcAcceptedMsg{
func (s *server) SendHtlcAccepted(id string, h *HtlcAccepted) {
s.htlcSendQueue <- &htlcAcceptedMsg{
id: id,
htlc: h,
timeout: time.Now().Add(s.subscriberTimeout),
@@ -173,11 +173,11 @@ func (s *server) Send(id string, h *HtlcAccepted) {
// and message. Blocks until a message is available. Returns a nil message if
// the server is done. This function effectively waits until a subscriber is
// active and has sent a message.
func (s *server) Receive() (string, interface{}) {
func (s *server) ReceiveHtlcResolution() (string, interface{}) {
select {
case <-s.done:
return "", nil
case msg := <-s.recvQueue:
case msg := <-s.htlcRecvQueue:
return msg.id, msg.result
}
}
@@ -191,7 +191,7 @@ func (s *server) listenHtlcRequests() {
case <-s.done:
log.Printf("listenHtlcRequests received done. Stop listening.")
return
case msg := <-s.sendQueue:
case msg := <-s.htlcSendQueue:
s.handleHtlcAccepted(msg)
}
}
@@ -202,8 +202,8 @@ func (s *server) listenHtlcRequests() {
func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
for {
s.mtx.Lock()
stream := s.stream
ns := s.newSubscriber
stream := s.htlcStream
ns := s.htlcnewSubscriber
s.mtx.Unlock()
// If there is no active subscription, wait until there is a new
@@ -228,7 +228,7 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
// If the subscriber timeout expires while holding the htlc
// we short circuit the htlc by sending the default result
// (continue) to cln.
s.recvQueue <- &htlcResultMsg{
s.htlcRecvQueue <- &htlcResultMsg{
id: msg.id,
result: s.defaultResult(),
}
@@ -283,10 +283,10 @@ func (s *server) listenHtlcResponses() {
log.Printf("listenHtlcResponses received done. Stopping listening.")
return
default:
resp := s.recv()
s.recvQueue <- &htlcResultMsg{
resp := s.recvHtlcResolution()
s.htlcRecvQueue <- &htlcResultMsg{
id: resp.Correlationid,
result: s.mapResult(resp.Outcome),
result: s.mapHtlcResult(resp.Outcome),
}
}
}
@@ -295,14 +295,14 @@ func (s *server) listenHtlcResponses() {
// Helper function that blocks until a message from a grpc client is received
// or the server stops. Either returns a received message, or nil if the server
// has stopped.
func (s *server) recv() *proto.HtlcResolution {
func (s *server) recvHtlcResolution() *proto.HtlcResolution {
for {
// make a copy of the used fields, to make sure state updates don't
// surprise us. The newSubscriber chan is swapped whenever a new
// subscriber arrives.
s.mtx.Lock()
stream := s.stream
ns := s.newSubscriber
stream := s.htlcStream
ns := s.htlcnewSubscriber
s.mtx.Unlock()
if stream == nil {
@@ -336,7 +336,7 @@ func (s *server) recv() *proto.HtlcResolution {
// Maps a grpc result to the corresponding result for cln. The cln message
// is a raw json message, so it's easiest to use a map directly.
func (s *server) mapResult(outcome interface{}) interface{} {
func (s *server) mapHtlcResult(outcome interface{}) interface{} {
// result: continue
cont, ok := outcome.(*proto.HtlcResolution_Continue)
if ok {