Merge pull request #53 from ellemouton/dynamicPrice

multi: Add gRPC price server option
This commit is contained in:
Olaoluwa Osuntokun
2021-08-25 19:59:36 -07:00
committed by GitHub
9 changed files with 497 additions and 8 deletions

View File

@@ -276,7 +276,7 @@ func (a *Aperture) Stop() error {
// Shut down our client and server connections now. This should cause
// the first goroutine to quit.
cleanup(a.etcdClient, a.httpsServer)
cleanup(a.etcdClient, a.httpsServer, a.proxy)
// If we started a tor server as well, shut it down now too to cause the
// second goroutine to quit.
@@ -514,7 +514,10 @@ func createProxy(cfg *Config, challenger *LndChallenger,
}
// cleanup closes the given server and shuts down the log rotator.
func cleanup(etcdClient io.Closer, server io.Closer) {
func cleanup(etcdClient io.Closer, server io.Closer, proxy io.Closer) {
if err := proxy.Close(); err != nil {
log.Errorf("Error terminating proxy: %v", err)
}
if err := etcdClient.Close(); err != nil {
log.Errorf("Error terminating etcd client: %v", err)
}

29
pricer/defaultPricer.go Normal file
View File

@@ -0,0 +1,29 @@
package pricer
import "context"
// DefaultPricer provides the same price for any service path. It implements
// the Pricer interface.
type DefaultPricer struct {
Price int64
}
// NewDefaultPricer initialises a new DefaultPricer provider where each resource
// for the service will have the same price.
func NewDefaultPricer(price int64) *DefaultPricer {
return &DefaultPricer{Price: price}
}
// GetPrice returns the price charged for all resources of a service.
// It is part of the Pricer interface.
func (d *DefaultPricer) GetPrice(_ context.Context, _ string) (int64,
error) {
return d.Price, nil
}
// Close is part of the Pricer interface. For the DefaultPricer, the method does
// nothing.
func (d *DefaultPricer) Close() error {
return nil
}

85
pricer/grpcPricer.go Normal file
View File

@@ -0,0 +1,85 @@
package pricer
import (
"context"
"fmt"
"github.com/lightninglabs/aperture/pricesrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Config holds all the config values required to initialise the GRPCPricer.
type Config struct {
// Enabled indicates if the grpcPricer is to be used.
Enabled bool `long:"enabled" description:"Set to true if a gRPC server is available to query for price data"`
// GRPCAddress is the address that the pricer gRPC server is serving on.
GRPCAddress string `long:"grpcaddress" description:"gRPC addr to use for price info for service resources"`
// Insecure indicates if the connection to the gRPC server should use
// TLS encryption or not.
Insecure bool `long:"insecure" description:"Set to true if no TLS encryption is to be used"`
// TLSCertPath is the path the the tls cert used by the price server.
TLSCertPath string `long:"tlscertpath" description:"Path to the servers tls cert"`
}
// GRPCPricer uses the pricesrpc PricesClient to query a backend server for
// the price of a service resource given the resource path. It implements the
// Pricer interface.
type GRPCPricer struct {
rpcConn *grpc.ClientConn
rpcClient pricesrpc.PricesClient
}
// NewGRPCPricer initialises a Pricer backed by a gRPC backend server.
func NewGRPCPricer(cfg *Config) (*GRPCPricer, error) {
var (
c GRPCPricer
err error
opt grpc.DialOption
)
if cfg.Insecure {
opt = grpc.WithInsecure()
} else {
tlsCredentials, err := credentials.NewClientTLSFromFile(
cfg.TLSCertPath, "",
)
if err != nil {
return nil, fmt.Errorf(
"unable to load TLS cert %s: %v",
cfg.TLSCertPath, err,
)
}
opt = grpc.WithTransportCredentials(tlsCredentials)
}
c.rpcConn, err = grpc.Dial(cfg.GRPCAddress, opt)
if err != nil {
return nil, err
}
c.rpcClient = pricesrpc.NewPricesClient(c.rpcConn)
return &c, nil
}
// GetPrice queries the server for the price of a resource path and returns the
// price. GetPrice is part of the Pricer interface.
func (c GRPCPricer) GetPrice(ctx context.Context, path string) (int64, error) {
resp, err := c.rpcClient.GetPrice(ctx, &pricesrpc.GetPriceRequest{
Path: path,
})
if err != nil {
return 0, err
}
return resp.Price, nil
}
// Close closes the gRPC connection. It is part of the Pricer interface.
func (c GRPCPricer) Close() error {
return c.rpcConn.Close()
}

13
pricer/pricer.go Normal file
View File

@@ -0,0 +1,13 @@
package pricer
import "context"
// Pricer is an interface used to query price data from a price provider.
type Pricer interface {
// GetPrice should return the price in satoshis for the given
// resource path.
GetPrice(ctx context.Context, path string) (int64, error)
// Close should clean up the Pricer implementation if needed.
Close() error
}

206
pricesrpc/rpc.pb.go Normal file
View File

@@ -0,0 +1,206 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: pricesrpc/rpc.proto
package pricesrpc
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type GetPriceRequest struct {
Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *GetPriceRequest) Reset() { *m = GetPriceRequest{} }
func (m *GetPriceRequest) String() string { return proto.CompactTextString(m) }
func (*GetPriceRequest) ProtoMessage() {}
func (*GetPriceRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_a48c5d96e99c79ae, []int{0}
}
func (m *GetPriceRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetPriceRequest.Unmarshal(m, b)
}
func (m *GetPriceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_GetPriceRequest.Marshal(b, m, deterministic)
}
func (m *GetPriceRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_GetPriceRequest.Merge(m, src)
}
func (m *GetPriceRequest) XXX_Size() int {
return xxx_messageInfo_GetPriceRequest.Size(m)
}
func (m *GetPriceRequest) XXX_DiscardUnknown() {
xxx_messageInfo_GetPriceRequest.DiscardUnknown(m)
}
var xxx_messageInfo_GetPriceRequest proto.InternalMessageInfo
func (m *GetPriceRequest) GetPath() string {
if m != nil {
return m.Path
}
return ""
}
type GetPriceResponse struct {
Price int64 `protobuf:"varint,3,opt,name=price,proto3" json:"price,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *GetPriceResponse) Reset() { *m = GetPriceResponse{} }
func (m *GetPriceResponse) String() string { return proto.CompactTextString(m) }
func (*GetPriceResponse) ProtoMessage() {}
func (*GetPriceResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_a48c5d96e99c79ae, []int{1}
}
func (m *GetPriceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetPriceResponse.Unmarshal(m, b)
}
func (m *GetPriceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_GetPriceResponse.Marshal(b, m, deterministic)
}
func (m *GetPriceResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_GetPriceResponse.Merge(m, src)
}
func (m *GetPriceResponse) XXX_Size() int {
return xxx_messageInfo_GetPriceResponse.Size(m)
}
func (m *GetPriceResponse) XXX_DiscardUnknown() {
xxx_messageInfo_GetPriceResponse.DiscardUnknown(m)
}
var xxx_messageInfo_GetPriceResponse proto.InternalMessageInfo
func (m *GetPriceResponse) GetPrice() int64 {
if m != nil {
return m.Price
}
return 0
}
func init() {
proto.RegisterType((*GetPriceRequest)(nil), "pricesrpc.GetPriceRequest")
proto.RegisterType((*GetPriceResponse)(nil), "pricesrpc.GetPriceResponse")
}
func init() { proto.RegisterFile("pricesrpc/rpc.proto", fileDescriptor_a48c5d96e99c79ae) }
var fileDescriptor_a48c5d96e99c79ae = []byte{
// 182 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x4c,
0x4e, 0x2d, 0x2e, 0x2a, 0x48, 0xd6, 0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17,
0xe2, 0x84, 0x0b, 0x2a, 0xa9, 0x72, 0xf1, 0xbb, 0xa7, 0x96, 0x04, 0x80, 0xf8, 0x41, 0xa9, 0x85,
0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, 0x5c, 0x2c, 0x05, 0x89, 0x25, 0x19, 0x12, 0x8c, 0x0a, 0x8c,
0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92, 0x06, 0x97, 0x00, 0x42, 0x59, 0x71, 0x41, 0x7e, 0x5e, 0x71,
0xaa, 0x90, 0x08, 0x17, 0x2b, 0xd8, 0x1c, 0x09, 0x66, 0x05, 0x46, 0x0d, 0xe6, 0x20, 0x08, 0xc7,
0xc8, 0x97, 0x8b, 0x0d, 0xac, 0xac, 0x58, 0xc8, 0x99, 0x8b, 0x03, 0xa6, 0x47, 0x48, 0x4a, 0x0f,
0x6e, 0xa5, 0x1e, 0x9a, 0x7d, 0x52, 0xd2, 0x58, 0xe5, 0x20, 0x96, 0x38, 0xe9, 0x46, 0x69, 0xa7,
0x67, 0x96, 0x64, 0x94, 0x26, 0xe9, 0x25, 0xe7, 0xe7, 0xea, 0xe7, 0x64, 0xa6, 0x67, 0x94, 0xe4,
0x65, 0xe6, 0xa5, 0xe7, 0x24, 0x26, 0x15, 0xeb, 0x27, 0x16, 0xa4, 0x16, 0x95, 0x94, 0x16, 0xa5,
0xea, 0xc3, 0xf5, 0x27, 0xb1, 0x81, 0x3d, 0x68, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x0d, 0x3d,
0x77, 0x8f, 0xf7, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// PricesClient is the client API for Prices service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type PricesClient interface {
GetPrice(ctx context.Context, in *GetPriceRequest, opts ...grpc.CallOption) (*GetPriceResponse, error)
}
type pricesClient struct {
cc *grpc.ClientConn
}
func NewPricesClient(cc *grpc.ClientConn) PricesClient {
return &pricesClient{cc}
}
func (c *pricesClient) GetPrice(ctx context.Context, in *GetPriceRequest, opts ...grpc.CallOption) (*GetPriceResponse, error) {
out := new(GetPriceResponse)
err := c.cc.Invoke(ctx, "/pricesrpc.Prices/GetPrice", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// PricesServer is the server API for Prices service.
type PricesServer interface {
GetPrice(context.Context, *GetPriceRequest) (*GetPriceResponse, error)
}
// UnimplementedPricesServer can be embedded to have forward compatible implementations.
type UnimplementedPricesServer struct {
}
func (*UnimplementedPricesServer) GetPrice(ctx context.Context, req *GetPriceRequest) (*GetPriceResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetPrice not implemented")
}
func RegisterPricesServer(s *grpc.Server, srv PricesServer) {
s.RegisterService(&_Prices_serviceDesc, srv)
}
func _Prices_GetPrice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetPriceRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PricesServer).GetPrice(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pricesrpc.Prices/GetPrice",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PricesServer).GetPrice(ctx, req.(*GetPriceRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Prices_serviceDesc = grpc.ServiceDesc{
ServiceName: "pricesrpc.Prices",
HandlerType: (*PricesServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetPrice",
Handler: _Prices_GetPrice_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pricesrpc/rpc.proto",
}

17
pricesrpc/rpc.proto Normal file
View File

@@ -0,0 +1,17 @@
syntax="proto3";
package pricesrpc;
option go_package = "github.com/lightninglabs/aperture/pricesrpc";
service Prices {
rpc GetPrice(GetPriceRequest) returns (GetPriceResponse);
}
message GetPriceRequest {
string path = 1;
}
message GetPriceResponse {
int64 price = 3;
}

View File

@@ -104,21 +104,50 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
resourceName := target.ResourceName(r.URL.Path)
// Determine auth level required to access service and dispatch request
// accordingly.
authLevel := target.AuthRequired(r)
switch {
case authLevel.IsOn():
if !p.authenticator.Accept(&r.Header, target.Name) {
// Determine if the header contains the authentication
// required for the given resource. The call to Accept is
// called in each case body rather than outside the switch so
// as to avoid calling this possibly expensive call for static
// resources.
acceptAuth := p.authenticator.Accept(&r.Header, resourceName)
if !acceptAuth {
price, err := target.pricer.GetPrice(
r.Context(), r.URL.Path,
)
if err != nil {
prefixLog.Errorf("error getting "+
"resource price: %v", err)
sendDirectResponse(
w, r, http.StatusInternalServerError,
"failure fetching "+
"resource price",
)
return
}
// If the price returned is zero, then break out of the
// switch statement and allow access to the service.
if price == 0 {
break
}
prefixLog.Infof("Authentication failed. Sending 402.")
p.handlePaymentRequired(w, r, target.Name, target.Price)
p.handlePaymentRequired(w, r, resourceName, price)
return
}
case authLevel.IsFreebie():
// We only need to respect the freebie counter if the user
// is not authenticated at all.
if !p.authenticator.Accept(&r.Header, target.Name) {
acceptAuth := p.authenticator.Accept(&r.Header, resourceName)
if !acceptAuth {
ok, err := target.freebieDb.CanPass(r, remoteIP)
if err != nil {
prefixLog.Errorf("Error querying freebie db: "+
@@ -130,7 +159,30 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if !ok {
p.handlePaymentRequired(w, r, target.Name, target.Price)
price, err := target.pricer.GetPrice(
r.Context(), r.URL.Path,
)
if err != nil {
prefixLog.Errorf("error getting "+
"resource price: %v", err)
sendDirectResponse(
w, r, http.StatusInternalServerError,
"failure fetching "+
"resource price",
)
return
}
// If the price returned is zero, then break
// out of the switch statement and allow access
// to the service.
if price == 0 {
break
}
p.handlePaymentRequired(
w, r, resourceName, target.Price,
)
return
}
_, err = target.freebieDb.TallyFreebie(r, remoteIP)
@@ -186,6 +238,20 @@ func (p *Proxy) UpdateServices(services []*Service) error {
return nil
}
// Close cleans up the Proxy by closing any remaining open connections.
func (p *Proxy) Close() error {
var returnErr error
for _, s := range p.services {
if err := s.pricer.Close(); err != nil {
log.Errorf("error while closing the pricer of "+
"service %s: %v", s.Name, err)
returnErr = err
}
}
return returnErr
}
// director is a method that rewrites an incoming request to be forwarded to a
// backend service.
func (p *Proxy) director(req *http.Request) {

View File

@@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/aperture/auth"
"github.com/lightninglabs/aperture/freebie"
"github.com/lightninglabs/aperture/pricer"
)
var (
@@ -84,6 +85,10 @@ type Service struct {
// service's endpoint.
Price int64 `long:"price" description:"Static LSAT value in satoshis to be used for this service"`
// DynamicPrice holds the config options needed for initialising
// the pricer if a gPRC server is to be used for price data.
DynamicPrice pricer.Config `long:"dynamicprice" description:"Configuration for connecting to the gRPC server to use for the pricer backend"`
// AuthWhitelistPaths is an optional list of regular expressions that
// are matched against the path of the URL of a request. If the request
// URL matches any of those regular expressions, the call is treated as
@@ -93,6 +98,20 @@ type Service struct {
AuthWhitelistPaths []string `long:"authwhitelistpaths" description:"List of regular expressions for paths that don't require authentication'"`
freebieDb freebie.DB
pricer pricer.Pricer
}
// ResourceName returns the string to be used to identify which resource a
// macaroon has access to. If DynamicPrice Enabled option is set to true then
// the service has further restrictions per resource and so the name will
// include both the service name and the specific resource name. Otherwise
// authorisation is only restricted by service name.
func (s *Service) ResourceName(resourcePath string) string {
if s.DynamicPrice.Enabled {
return fmt.Sprintf("%s%s", s.Name, resourcePath)
}
return s.Name
}
// AuthRequired determines the auth level required for a given request.
@@ -170,6 +189,22 @@ func prepareServices(services []*Service) error {
}
}
// If dynamic prices are enabled then use the provided
// DynamicPrice options to initialise a gRPC backed
// pricer client.
if service.DynamicPrice.Enabled {
priceClient, err := pricer.NewGRPCPricer(
&service.DynamicPrice,
)
if err != nil {
return fmt.Errorf("error initializing "+
"pricer: %v", err)
}
service.pricer = priceClient
continue
}
// Check that the price for the service is not negative and not
// more than the maximum amount allowed by lnd. If no price, or
// a price of zero satoshis, is set the then default price of 1
@@ -186,6 +221,10 @@ func prepareServices(services []*Service) error {
return fmt.Errorf("maximum price exceeded for "+
"service %s", service.Name)
}
// Initialise a default pricer where all resources in a server
// are given the same price.
service.pricer = pricer.NewDefaultPricer(service.Price)
}
return nil
}

View File

@@ -82,8 +82,26 @@ services:
constraints:
"valid_until": "2020-01-01"
# The LSAT value in satoshis for the service.
price: 1
# The LSAT value in satoshis for the service. It is ignored if
# dynamicprice.enabled is set to true.
price: 0
# Options to use for connection to the price serving gRPC server.
dynamicprice:
# Whether or not a gRPC server is available to query price data from. If
# this option is set to true then the 'price' option is ignored.
enabled: true
# The address of the gRPC pricer server.
grpcaddress: "127.0.0.1:10010"
# Whether or not TLS encryption should be used for communications with the
# gRPC server.
insecure: false
# The path to the pricer server's tls.cert. If the 'insecure' option is
# set to true then this path must be set.
tlscertpath: "path-to-pricer-server-tls-cert/tls.cert"
- name: "service2"
hostregexp: "service2.com:8083"
@@ -94,6 +112,19 @@ services:
"valid_until": "2020-01-01"
price: 1
- name: "service3"
hostregexp: "service3.com:8083"
pathregexp: '^/.*$'
address: "123.456.789:8082"
protocol: https
constraints:
"valid_until": "2020-01-01"
dynamicprice:
enbled: true
grpcaddress: 123.456.789:8083
insecure: false
tlscertpath: "path-to-pricer-server-tls-cert/tls.cert"
# Settings for a Tor instance to allow requests over Tor as onion services.
# Configuring Tor is optional.
tor: