diff --git a/rpcserver.go b/rpcserver.go index 11227fe6..35eec952 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3680,25 +3680,28 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { htlcSema <- struct{}{} } + // We keep track of the running goroutines and set up a quit signal we + // can use to request them to exit if the method returns because of an + // encountered error. + var wg sync.WaitGroup + reqQuit := make(chan struct{}) + defer close(reqQuit) + // Launch a new goroutine to handle reading new payment requests from // the client. This way we can handle errors independently of blocking // and waiting for the next payment request to come through. - reqQuit := make(chan struct{}) - defer func() { - close(reqQuit) - }() - // TODO(joostjager): Callers expect result to come in in the same order // as the request were sent, but this is far from guarantueed in the // code below. + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-reqQuit: return - case <-r.quit: - errChan <- nil - return + default: // Receive the next pending payment within the // stream sent by the client. If we read the @@ -3706,13 +3709,15 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { // stream, and we can exit normally. nextPayment, err := stream.recv() if err == io.EOF { - errChan <- nil + close(payChan) return } else if err != nil { + rpcsLog.Errorf("Failed receiving from "+ + "stream: %v", err) + select { case errChan <- err: - case <-reqQuit: - return + default: } return } @@ -3730,18 +3735,22 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { PaymentError: err.Error(), PaymentHash: payIntent.rHash[:], }); err != nil { + rpcsLog.Errorf("Failed "+ + "sending on "+ + "stream: %v", err) + select { case errChan <- err: - case <-reqQuit: - return + default: } + return } continue } // If the payment was well formed, then we'll // send to the dispatch goroutine, or exit, - // which ever comes first + // which ever comes first. select { case payChan <- &payIntent: case <-reqQuit: @@ -3751,20 +3760,41 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { } }() +sendLoop: for { select { + + // If we encounter and error either during sending or + // receiving, we return directly, closing the stream. case err := <-errChan: return err - case payIntent := <-payChan: + case <-r.quit: + return errors.New("rpc server shutting down") + + case payIntent, ok := <-payChan: + // If the receive loop is done, we break the send loop + // and wait for the ongoing payments to finish before + // exiting. + if !ok { + break sendLoop + } + // We launch a new goroutine to execute the current // payment so we can continue to serve requests while // this payment is being dispatched. + wg.Add(1) go func() { + defer wg.Done() + // Attempt to grab a free semaphore slot, using // a defer to eventually release the slot // regardless of payment success. - <-htlcSema + select { + case <-htlcSema: + case <-reqQuit: + return + } defer func() { htlcSema <- struct{}{} }() @@ -3778,7 +3808,13 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { // payment, then we'll return the error to the // user, and terminate. case saveErr != nil: - errChan <- saveErr + rpcsLog.Errorf("Failed dispatching "+ + "payment intent: %v", saveErr) + + select { + case errChan <- saveErr: + default: + } return // If we receive payment error than, instead of @@ -3790,7 +3826,14 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { PaymentHash: payIntent.rHash[:], }) if err != nil { - errChan <- err + rpcsLog.Errorf("Failed "+ + "sending error "+ + "response: %v", err) + + select { + case errChan <- err: + default: + } } return } @@ -3810,12 +3853,22 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { PaymentRoute: marshalledRouted, }) if err != nil { - errChan <- err + rpcsLog.Errorf("Failed sending "+ + "response: %v", err) + + select { + case errChan <- err: + default: + } return } }() } } + + // Wait for all goroutines to finish before closing the stream. + wg.Wait() + return nil } // SendPaymentSync is the synchronous non-streaming version of SendPayment.