mirror of
https://github.com/getAlby/lndhub.go.git
synced 2026-01-16 03:15:57 +01:00
Merge pull request #135 from getAlby/feature/event-stream
Feature/event-stream
This commit is contained in:
@@ -19,7 +19,7 @@ func NewGetTXSController(svc *service.LndhubService) *GetTXSController {
|
||||
}
|
||||
|
||||
type OutgoingInvoice struct {
|
||||
RHash interface{} `json:"r_hash"`
|
||||
RHash interface{} `json:"r_hash,omitempty"`
|
||||
PaymentHash interface{} `json:"payment_hash"`
|
||||
PaymentPreimage string `json:"payment_preimage"`
|
||||
Value int64 `json:"value"`
|
||||
@@ -30,7 +30,7 @@ type OutgoingInvoice struct {
|
||||
}
|
||||
|
||||
type IncomingInvoice struct {
|
||||
RHash interface{} `json:"r_hash"`
|
||||
RHash interface{} `json:"r_hash,omitempty"`
|
||||
PaymentHash interface{} `json:"payment_hash"`
|
||||
PaymentRequest string `json:"payment_request"`
|
||||
Description string `json:"description"`
|
||||
|
||||
150
controllers/invoicestream.ctrl.go
Normal file
150
controllers/invoicestream.ctrl.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/getAlby/lndhub.go/common"
|
||||
"github.com/getAlby/lndhub.go/db/models"
|
||||
"github.com/getAlby/lndhub.go/lib/service"
|
||||
"github.com/getAlby/lndhub.go/lib/tokens"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
// GetTXSController : GetTXSController struct
|
||||
type InvoiceStreamController struct {
|
||||
svc *service.LndhubService
|
||||
}
|
||||
|
||||
type InvoiceEventWrapper struct {
|
||||
Type string `json:"type"`
|
||||
Invoice *IncomingInvoice `json:"invoice,omitempty"`
|
||||
}
|
||||
|
||||
func NewInvoiceStreamController(svc *service.LndhubService) *InvoiceStreamController {
|
||||
return &InvoiceStreamController{svc: svc}
|
||||
}
|
||||
|
||||
// Stream invoices streams incoming payments to the client
|
||||
func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error {
|
||||
userId, err := tokens.ParseToken(controller.svc.Config.JWTSecret, (c.QueryParam("token")), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
invoiceChan := make(chan models.Invoice)
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
ws, done, err := createWebsocketUpgrader(c)
|
||||
defer ws.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//start subscription
|
||||
subId := controller.svc.InvoicePubSub.Subscribe(userId, invoiceChan)
|
||||
|
||||
//start with keepalive message
|
||||
err = ws.WriteJSON(&InvoiceEventWrapper{Type: "keepalive"})
|
||||
if err != nil {
|
||||
controller.svc.Logger.Error(err)
|
||||
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
|
||||
return err
|
||||
}
|
||||
fromPaymentHash := c.QueryParam("since_payment_hash")
|
||||
if fromPaymentHash != "" {
|
||||
err = controller.writeMissingInvoices(c, userId, ws, fromPaymentHash)
|
||||
if err != nil {
|
||||
controller.svc.Logger.Error(err)
|
||||
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
|
||||
return err
|
||||
}
|
||||
}
|
||||
SocketLoop:
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
break SocketLoop
|
||||
case <-ticker.C:
|
||||
err := ws.WriteJSON(&InvoiceEventWrapper{Type: "keepalive"})
|
||||
if err != nil {
|
||||
controller.svc.Logger.Error(err)
|
||||
break SocketLoop
|
||||
}
|
||||
case invoice := <-invoiceChan:
|
||||
err := ws.WriteJSON(
|
||||
&InvoiceEventWrapper{
|
||||
Type: "invoice",
|
||||
Invoice: &IncomingInvoice{
|
||||
PaymentHash: invoice.RHash,
|
||||
PaymentRequest: invoice.PaymentRequest,
|
||||
Description: invoice.Memo,
|
||||
PayReq: invoice.PaymentRequest,
|
||||
Timestamp: invoice.CreatedAt.Unix(),
|
||||
Type: common.InvoiceTypeUser,
|
||||
Amount: invoice.Amount,
|
||||
IsPaid: invoice.State == common.InvoiceStateSettled,
|
||||
}})
|
||||
if err != nil {
|
||||
controller.svc.Logger.Error(err)
|
||||
break SocketLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
controller.svc.InvoicePubSub.Unsubscribe(subId, userId)
|
||||
return nil
|
||||
}
|
||||
|
||||
//open the websocket and start listening for close messages in a goroutine
|
||||
func createWebsocketUpgrader(c echo.Context) (conn *websocket.Conn, done chan struct{}, err error) {
|
||||
upgrader := websocket.Upgrader{}
|
||||
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
|
||||
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
//start listening for close messages
|
||||
done = make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for {
|
||||
_, _, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ws, done, nil
|
||||
}
|
||||
|
||||
func (controller *InvoiceStreamController) writeMissingInvoices(c echo.Context, userId int64, ws *websocket.Conn, hash string) error {
|
||||
invoices, err := controller.svc.InvoicesFor(c.Request().Context(), userId, common.InvoiceTypeIncoming)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, inv := range invoices {
|
||||
//invoices are order from newest to oldest (with a maximum of 100 invoices being returned)
|
||||
//so if we get a match on the hash, we have processed all missing invoices for this client
|
||||
if inv.RHash == hash {
|
||||
break
|
||||
}
|
||||
if inv.State == common.InvoiceStateSettled {
|
||||
err := ws.WriteJSON(
|
||||
&InvoiceEventWrapper{
|
||||
Type: "invoice",
|
||||
Invoice: &IncomingInvoice{
|
||||
PaymentHash: inv.RHash,
|
||||
PaymentRequest: inv.PaymentRequest,
|
||||
Description: inv.Memo,
|
||||
PayReq: inv.PaymentRequest,
|
||||
Timestamp: inv.CreatedAt.Unix(),
|
||||
Type: common.InvoiceTypeUser,
|
||||
Amount: inv.Amount,
|
||||
IsPaid: inv.State == common.InvoiceStateSettled,
|
||||
}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
1
go.mod
1
go.mod
@@ -28,6 +28,7 @@ require (
|
||||
|
||||
require (
|
||||
github.com/SporkHubr/echo-http-cache v0.0.0-20200706100054-1d7ae9f38029
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/labstack/echo-contrib v0.12.0
|
||||
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
|
||||
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d // indirect
|
||||
|
||||
3
go.sum
3
go.sum
@@ -359,8 +359,9 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
|
||||
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
||||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
|
||||
|
||||
@@ -90,6 +90,10 @@ type ExpectedIncomingInvoice struct {
|
||||
Amount int64 `json:"amt"`
|
||||
IsPaid bool `json:"ispaid"`
|
||||
}
|
||||
type ExpectedInvoiceEventWrapper struct {
|
||||
Type string `json:"type"`
|
||||
Invoice *ExpectedIncomingInvoice `json:"invoice,omitempty"`
|
||||
}
|
||||
|
||||
type ExpectedPayInvoiceRequestBody struct {
|
||||
Invoice string `json:"invoice" validate:"required"`
|
||||
|
||||
@@ -92,6 +92,7 @@ func LndHubTestServiceInit(lndClientMock lnd.LightningClientWrapper) (svc *servi
|
||||
}
|
||||
svc.IdentityPubkey = getInfo.IdentityPubkey
|
||||
|
||||
svc.InvoicePubSub = service.NewPubsub()
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
|
||||
278
integration_tests/websocket_test.go
Normal file
278
integration_tests/websocket_test.go
Normal file
@@ -0,0 +1,278 @@
|
||||
package integration_tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/getAlby/lndhub.go/controllers"
|
||||
"github.com/getAlby/lndhub.go/lib"
|
||||
"github.com/getAlby/lndhub.go/lib/responses"
|
||||
"github.com/getAlby/lndhub.go/lib/service"
|
||||
"github.com/getAlby/lndhub.go/lib/tokens"
|
||||
"github.com/getAlby/lndhub.go/lnd"
|
||||
"github.com/go-playground/validator/v10"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/lightningnetwork/lnd/lnrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type KeepAlive struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
type WebSocketTestSuite struct {
|
||||
TestSuite
|
||||
fundingClient *lnd.LNDWrapper
|
||||
service *service.LndhubService
|
||||
userLogin ExpectedCreateUserResponseBody
|
||||
userToken string
|
||||
userToken2 string
|
||||
invoiceUpdateSubCancelFn context.CancelFunc
|
||||
websocketServer *httptest.Server
|
||||
wsUrl string
|
||||
wsUrl2 string
|
||||
}
|
||||
type WsHandler struct {
|
||||
handler echo.HandlerFunc
|
||||
}
|
||||
|
||||
func (h *WsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
e := echo.New()
|
||||
c := e.NewContext(r, w)
|
||||
|
||||
err := h.handler(c)
|
||||
if err != nil {
|
||||
_, _ = w.Write([]byte(err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *WebSocketTestSuite) SetupSuite() {
|
||||
lndClient, err := lnd.NewLNDclient(lnd.LNDoptions{
|
||||
Address: lnd2RegtestAddress,
|
||||
MacaroonHex: lnd2RegtestMacaroonHex,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Error setting up funding client: %v", err)
|
||||
}
|
||||
suite.fundingClient = lndClient
|
||||
|
||||
svc, err := LndHubTestServiceInit(nil)
|
||||
if err != nil {
|
||||
log.Fatalf("Error initializing test service: %v", err)
|
||||
}
|
||||
users, userTokens, err := createUsers(svc, 2)
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating test users: %v", err)
|
||||
}
|
||||
// Subscribe to LND invoice updates in the background
|
||||
// store cancel func to be called in tear down suite
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
suite.invoiceUpdateSubCancelFn = cancel
|
||||
go svc.InvoiceUpdateSubscription(ctx)
|
||||
suite.service = svc
|
||||
e := echo.New()
|
||||
|
||||
e.HTTPErrorHandler = responses.HTTPErrorHandler
|
||||
e.Validator = &lib.CustomValidator{Validator: validator.New()}
|
||||
suite.echo = e
|
||||
assert.Equal(suite.T(), 2, len(users))
|
||||
assert.Equal(suite.T(), 2, len(userTokens))
|
||||
suite.userLogin = users[0]
|
||||
suite.userToken = userTokens[0]
|
||||
suite.userToken2 = userTokens[1]
|
||||
suite.echo.Use(tokens.Middleware([]byte(suite.service.Config.JWTSecret)))
|
||||
suite.echo.POST("/addinvoice", controllers.NewAddInvoiceController(suite.service).AddInvoice)
|
||||
|
||||
//websocket server
|
||||
h := WsHandler{handler: controllers.NewInvoiceStreamController(suite.service).StreamInvoices}
|
||||
server := httptest.NewServer(http.HandlerFunc(h.ServeHTTP))
|
||||
suite.websocketServer = server
|
||||
suite.wsUrl = "ws" + strings.TrimPrefix(suite.websocketServer.URL, "http") + fmt.Sprintf("?token=%s", suite.userToken)
|
||||
suite.wsUrl2 = "ws" + strings.TrimPrefix(suite.websocketServer.URL, "http") + fmt.Sprintf("?token=%s", suite.userToken2)
|
||||
}
|
||||
|
||||
func (suite *WebSocketTestSuite) TestWebSocket() {
|
||||
//start listening to websocket
|
||||
ws, _, err := websocket.DefaultDialer.Dial(suite.wsUrl, nil)
|
||||
assert.NoError(suite.T(), err)
|
||||
_, msg, err := ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
keepAlive := KeepAlive{}
|
||||
err = json.Unmarshal([]byte(msg), &keepAlive)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), "keepalive", keepAlive.Type)
|
||||
|
||||
// create incoming invoice and fund account
|
||||
invoice := suite.createAddInvoiceReq(1000, "integration test websocket 1", suite.userToken)
|
||||
sendPaymentRequest := lnrpc.SendRequest{
|
||||
PaymentRequest: invoice.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
_, err = suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequest)
|
||||
assert.NoError(suite.T(), err)
|
||||
|
||||
_, msg, err = ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
event := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(msg), &event)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), event.Type, "invoice")
|
||||
assert.Equal(suite.T(), int64(1000), event.Invoice.Amount)
|
||||
assert.Equal(suite.T(), "integration test websocket 1", event.Invoice.Description)
|
||||
}
|
||||
|
||||
func (suite *WebSocketTestSuite) TestWebSocketDoubeSubscription() {
|
||||
//create 1st subscription
|
||||
ws1, _, err := websocket.DefaultDialer.Dial(suite.wsUrl, nil)
|
||||
assert.NoError(suite.T(), err)
|
||||
//read keepalive msg
|
||||
_, _, err = ws1.ReadMessage()
|
||||
//create 2nd subscription, create invoice, pay invoice, assert that invoice is received twice
|
||||
//start listening to websocket
|
||||
ws2, _, err := websocket.DefaultDialer.Dial(suite.wsUrl, nil)
|
||||
assert.NoError(suite.T(), err)
|
||||
//read keepalive msg
|
||||
_, _, err = ws2.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
invoice := suite.createAddInvoiceReq(1000, "integration test websocket 2", suite.userToken)
|
||||
sendPaymentRequest := lnrpc.SendRequest{
|
||||
PaymentRequest: invoice.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
_, err = suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequest)
|
||||
assert.NoError(suite.T(), err)
|
||||
_, msg1, err := ws1.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
_, msg2, err := ws2.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
|
||||
event1 := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(msg1), &event1)
|
||||
assert.NoError(suite.T(), err)
|
||||
event2 := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(msg2), &event2)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), "integration test websocket 2", event1.Invoice.Description)
|
||||
assert.Equal(suite.T(), "integration test websocket 2", event2.Invoice.Description)
|
||||
//close 1 subscription, assert that the existing sub still receives their invoices
|
||||
ws1.Close()
|
||||
invoice = suite.createAddInvoiceReq(1000, "integration test websocket 3", suite.userToken)
|
||||
sendPaymentRequest = lnrpc.SendRequest{
|
||||
PaymentRequest: invoice.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
_, err = suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequest)
|
||||
assert.NoError(suite.T(), err)
|
||||
_, msg3, err := ws2.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
event3 := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(msg3), &event3)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), "integration test websocket 3", event3.Invoice.Description)
|
||||
|
||||
}
|
||||
func (suite *WebSocketTestSuite) TestWebSocketDoubleUser() {
|
||||
|
||||
//create subs for 2 different users, assert that they each get their own invoice updates
|
||||
user1Ws, _, err := websocket.DefaultDialer.Dial(suite.wsUrl, nil)
|
||||
assert.NoError(suite.T(), err)
|
||||
//read keepalive msg
|
||||
_, _, err = user1Ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
//create subs for 2 different users, assert that they each get their own invoice updates
|
||||
user2Ws, _, err := websocket.DefaultDialer.Dial(suite.wsUrl2, nil)
|
||||
assert.NoError(suite.T(), err)
|
||||
//read keepalive msg
|
||||
_, _, err = user2Ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
// add invoice for user 1
|
||||
user1Invoice := suite.createAddInvoiceReq(1000, "integration test websocket user 1", suite.userToken)
|
||||
sendPaymentRequestUser1 := lnrpc.SendRequest{
|
||||
PaymentRequest: user1Invoice.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
// add invoice for user 2
|
||||
user2Invoice := suite.createAddInvoiceReq(1000, "integration test websocket user 2", suite.userToken2)
|
||||
sendPaymentRequestUser2 := lnrpc.SendRequest{
|
||||
PaymentRequest: user2Invoice.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
//pay invoices
|
||||
_, err = suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequestUser1)
|
||||
assert.NoError(suite.T(), err)
|
||||
_, err = suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequestUser2)
|
||||
assert.NoError(suite.T(), err)
|
||||
//read user 1 received msg
|
||||
_, user1Msg, err := user1Ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
//assert it's their's
|
||||
eventUser1 := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(user1Msg), &eventUser1)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), "integration test websocket user 1", eventUser1.Invoice.Description)
|
||||
//read user 2 received msg
|
||||
_, user2Msg, err := user2Ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
//assert it's their's
|
||||
eventUser2 := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(user2Msg), &eventUser2)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), "integration test websocket user 2", eventUser2.Invoice.Description)
|
||||
|
||||
}
|
||||
func (suite *WebSocketTestSuite) TestWebSocketMissingInvoice() {
|
||||
// create incoming invoice and fund account
|
||||
invoice1 := suite.createAddInvoiceReq(1000, "integration test websocket missing invoices", suite.userToken)
|
||||
sendPaymentRequest := lnrpc.SendRequest{
|
||||
PaymentRequest: invoice1.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
_, err := suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequest)
|
||||
assert.NoError(suite.T(), err)
|
||||
|
||||
// create 2nd invoice and pay it as well
|
||||
invoice2 := suite.createAddInvoiceReq(1000, "integration test websocket missing invoices 2nd", suite.userToken)
|
||||
sendPaymentRequest = lnrpc.SendRequest{
|
||||
PaymentRequest: invoice2.PayReq,
|
||||
FeeLimit: nil,
|
||||
}
|
||||
_, err = suite.fundingClient.SendPaymentSync(context.Background(), &sendPaymentRequest)
|
||||
assert.NoError(suite.T(), err)
|
||||
|
||||
//start listening to websocket after 2nd invoice has been paid
|
||||
//we should get an event for the 2nd invoice if we specify the hash as the query parameter
|
||||
ws, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s&since_payment_hash=%s", suite.wsUrl, invoice1.RHash), nil)
|
||||
assert.NoError(suite.T(), err)
|
||||
_, msg, err := ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
keepAlive := KeepAlive{}
|
||||
err = json.Unmarshal([]byte(msg), &keepAlive)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), "keepalive", keepAlive.Type)
|
||||
|
||||
_, msg, err = ws.ReadMessage()
|
||||
assert.NoError(suite.T(), err)
|
||||
event := ExpectedInvoiceEventWrapper{}
|
||||
err = json.Unmarshal([]byte(msg), &event)
|
||||
assert.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), event.Type, "invoice")
|
||||
assert.Equal(suite.T(), int64(1000), event.Invoice.Amount)
|
||||
assert.Equal(suite.T(), "integration test websocket missing invoices 2nd", event.Invoice.Description)
|
||||
}
|
||||
|
||||
func (suite *WebSocketTestSuite) TearDownSuite() {
|
||||
suite.invoiceUpdateSubCancelFn()
|
||||
suite.websocketServer.Close()
|
||||
}
|
||||
|
||||
func TestWebSocketSuite(t *testing.T) {
|
||||
suite.Run(t, new(WebSocketTestSuite))
|
||||
}
|
||||
@@ -97,6 +97,7 @@ func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice *
|
||||
svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
|
||||
return err
|
||||
}
|
||||
svc.InvoicePubSub.Publish(invoice.UserID, invoice)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
56
lib/service/pubsub.go
Normal file
56
lib/service/pubsub.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/getAlby/lndhub.go/db/models"
|
||||
)
|
||||
|
||||
type Pubsub struct {
|
||||
mu sync.RWMutex
|
||||
subs map[int64]map[string]chan models.Invoice
|
||||
}
|
||||
|
||||
func NewPubsub() *Pubsub {
|
||||
ps := &Pubsub{}
|
||||
ps.subs = make(map[int64]map[string]chan models.Invoice)
|
||||
return ps
|
||||
}
|
||||
|
||||
func (ps *Pubsub) Subscribe(topic int64, ch chan models.Invoice) (subId string) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
if ps.subs[topic] == nil {
|
||||
ps.subs[topic] = make(map[string]chan models.Invoice)
|
||||
}
|
||||
//re-use preimage code for a uuid
|
||||
subId = string(makePreimageHex())
|
||||
ps.subs[topic][subId] = ch
|
||||
return subId
|
||||
}
|
||||
|
||||
func (ps *Pubsub) Unsubscribe(id string, topic int64) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
if ps.subs[topic] == nil {
|
||||
return
|
||||
}
|
||||
if ps.subs[topic][id] == nil {
|
||||
return
|
||||
}
|
||||
close(ps.subs[topic][id])
|
||||
delete(ps.subs[topic], id)
|
||||
}
|
||||
|
||||
func (ps *Pubsub) Publish(topic int64, msg models.Invoice) {
|
||||
ps.mu.RLock()
|
||||
defer ps.mu.RUnlock()
|
||||
|
||||
if ps.subs[topic] == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, ch := range ps.subs[topic] {
|
||||
ch <- msg
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ type LndhubService struct {
|
||||
LndClient lnd.LightningClientWrapper
|
||||
Logger *lecho.Logger
|
||||
IdentityPubkey string
|
||||
InvoicePubSub *Pubsub
|
||||
}
|
||||
|
||||
func (svc *LndhubService) GenerateToken(ctx context.Context, login, password, inRefreshToken string) (accessToken, refreshToken string, err error) {
|
||||
|
||||
@@ -81,8 +81,7 @@ func GenerateRefreshToken(secret []byte, expiryInSeconds int, u *models.User) (s
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func GetUserIdFromToken(secret []byte, token string) (int64, error) {
|
||||
func ParseToken(secret []byte, token string, mustBeRefresh bool) (int64, error) {
|
||||
userIdClaim := "id"
|
||||
isRefreshClaim := "isRefresh"
|
||||
claims := jwt.MapClaims{}
|
||||
@@ -100,7 +99,7 @@ func GetUserIdFromToken(secret []byte, token string) (int64, error) {
|
||||
|
||||
var userId interface{}
|
||||
for k, v := range claims {
|
||||
if k == isRefreshClaim && v.(bool) == false {
|
||||
if k == isRefreshClaim && !v.(bool) && mustBeRefresh {
|
||||
return -1, errors.New("This is not a refresh token")
|
||||
}
|
||||
if k == userIdClaim {
|
||||
@@ -114,3 +113,7 @@ func GetUserIdFromToken(secret []byte, token string) (int64, error) {
|
||||
|
||||
return int64(userId.(float64)), nil
|
||||
}
|
||||
|
||||
func GetUserIdFromToken(secret []byte, token string) (int64, error) {
|
||||
return ParseToken(secret, token, true)
|
||||
}
|
||||
|
||||
5
main.go
5
main.go
@@ -124,6 +124,7 @@ func main() {
|
||||
LndClient: lndClient,
|
||||
Logger: logger,
|
||||
IdentityPubkey: getInfo.IdentityPubkey,
|
||||
InvoicePubSub: service.NewPubsub(),
|
||||
}
|
||||
|
||||
strictRateLimitMiddleware := createRateLimitMiddleware(c.StrictRateLimit, c.BurstRateLimit)
|
||||
@@ -160,6 +161,10 @@ func main() {
|
||||
"/favicon.ico": "/static/img/favicon.png",
|
||||
}))
|
||||
|
||||
//invoice streaming
|
||||
//Authentication should be done through the query param because this is a websocket
|
||||
e.GET("/invoices/stream", controllers.NewInvoiceStreamController(svc).StreamInvoices)
|
||||
|
||||
// Subscribe to LND invoice updates in the background
|
||||
go svc.InvoiceUpdateSubscription(context.Background())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user