diff --git a/grpcserver/server.go b/grpcserver/server.go deleted file mode 100644 index 572193f..0000000 --- a/grpcserver/server.go +++ /dev/null @@ -1,95 +0,0 @@ -package grpcserver - -import ( - "context" - - "github.com/getAlby/lndhub.go/common" - "github.com/getAlby/lndhub.go/db/models" - "github.com/getAlby/lndhub.go/lib/service" - "github.com/getAlby/lndhub.go/lndhubrpc" - pb "github.com/getAlby/lndhub.go/lndhubrpc" - "google.golang.org/protobuf/types/known/timestamppb" -) - -// server is used to implement helloworld.GreeterServer. -type Server struct { - pb.UnimplementedInvoiceSubscriptionServer - svc *service.LndhubService - incomingInvoices chan models.Invoice - ctx context.Context -} - -func NewGrpcServer(svc *service.LndhubService, ctx context.Context) (*Server, error) { - incomingInvoices := make(chan models.Invoice) - _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices) - if err != nil { - return nil, err - } - return &Server{ - svc: svc, - incomingInvoices: incomingInvoices, - ctx: ctx, - }, nil -} - -func (s *Server) SubsribeInvoices(req *lndhubrpc.SubsribeInvoicesRequest, srv lndhubrpc.InvoiceSubscription_SubsribeInvoicesServer) error { - alreadySeenId := int64(-1) - if req.FromId != nil { - //look up all settled incoming invoices from a certain id - //and return them first - invoices := []models.Invoice{} - err := s.svc.DB.NewSelect().Model(&invoices).Where("state = 'settled'").Where("type = 'incoming'").Where("id > ?", *req.FromId).OrderExpr("id ASC").Scan(s.ctx) - if err != nil { - return err - } - //add this so we can avoid duplicates in case of a race condition - if len(invoices) != 0 { - alreadySeenId = int64(invoices[len(invoices)-1].ID) - } - for _, inv := range invoices { - srv.Send(convertInvoice(inv)) - } - } - for { - select { - case <-s.ctx.Done(): - return nil - case inv := <-s.incomingInvoices: - //in case we've already send it over - if inv.ID > alreadySeenId { - srv.Send(convertInvoice(inv)) - } - } - } -} - -func convertInvoice(inv models.Invoice) *pb.Invoice { - customRecords := []*pb.Invoice_CustomRecords{} - for key, value := range inv.DestinationCustomRecords { - customRecords = append(customRecords, &pb.Invoice_CustomRecords{ - //todo: fix types - Key: key, - Value: value, - }) - } - return &pb.Invoice{ - Id: uint32(inv.ID), - Type: inv.Type, - UserId: uint32(inv.UserID), - Amount: uint32(inv.Amount), - Fee: uint32(inv.Fee), - Memo: inv.Memo, - DescriptionHash: inv.DescriptionHash, - PaymentRequest: inv.PaymentRequest, - DestinationPubkeyHex: inv.DestinationPubkeyHex, - CustomRecords: customRecords, - RHash: inv.RHash, - Preimage: inv.Preimage, - Keysend: inv.Keysend, - State: inv.State, - CreatedAt: timestamppb.New(inv.CreatedAt), - ExpiresAt: timestamppb.New(inv.ExpiresAt.Time), - UpdatedAt: timestamppb.New(inv.UpdatedAt.Time), - SettledAt: timestamppb.New(inv.SettledAt.Time), - } -} diff --git a/main.go b/main.go index b20fd14..be831dd 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ import ( "embed" "fmt" "log" - "net" "net/http" "os" "os/signal" @@ -17,13 +16,11 @@ import ( "github.com/getAlby/lndhub.go/db" "github.com/getAlby/lndhub.go/db/migrations" "github.com/getAlby/lndhub.go/docs" - "github.com/getAlby/lndhub.go/grpcserver" "github.com/getAlby/lndhub.go/lib" "github.com/getAlby/lndhub.go/lib/responses" "github.com/getAlby/lndhub.go/lib/service" "github.com/getAlby/lndhub.go/lib/tokens" "github.com/getAlby/lndhub.go/lnd" - "github.com/getAlby/lndhub.go/lndhubrpc" "github.com/getsentry/sentry-go" sentryecho "github.com/getsentry/sentry-go/echo" "github.com/go-playground/validator/v10" @@ -37,7 +34,6 @@ import ( "github.com/uptrace/bun/migrate" "github.com/ziflex/lecho/v3" "golang.org/x/time/rate" - "google.golang.org/grpc" ) //go:embed templates/index.html @@ -186,7 +182,7 @@ func main() { if svc.Config.EnableGRPC { //start grpc server grpcContext, grpcCancel := context.WithCancel(context.Background()) - go StartGrpcServer(svc, grpcContext) + go svc.StartGrpcServer(grpcContext) defer grpcCancel() } @@ -233,23 +229,6 @@ func main() { } -func StartGrpcServer(svc *service.LndhubService, ctx context.Context) { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", svc.Config.GRPCPort)) - if err != nil { - svc.Logger.Fatalf("Failed to start grpc server: %v", err) - } - s := grpc.NewServer() - grpcServer, err := grpcserver.NewGrpcServer(svc, ctx) - if err != nil { - svc.Logger.Fatalf("Failed to init grpc server, %s", err.Error()) - } - lndhubrpc.RegisterInvoiceSubscriptionServer(s, grpcServer) - svc.Logger.Infof("gRPC server started at %v", lis.Addr()) - if err := s.Serve(lis); err != nil { - svc.Logger.Fatalf("failed to serve: %v", err) - } -} - func createRateLimitMiddleware(seconds int, burst int) echo.MiddlewareFunc { config := middleware.RateLimiterMemoryStoreConfig{ Rate: rate.Every(time.Duration(seconds) * time.Second),