Merge pull request #5346 from joostjager/custom-messages

lnrpc+peer: custom peer messages
This commit is contained in:
Olaoluwa Osuntokun
2021-10-18 19:07:39 -07:00
committed by GitHub
18 changed files with 5041 additions and 3845 deletions

83
cmd/lncli/cmd_custom.go Normal file
View File

@@ -0,0 +1,83 @@
package main
import (
"encoding/hex"
"fmt"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/urfave/cli"
)
var sendCustomCommand = cli.Command{
Name: "sendcustom",
Flags: []cli.Flag{
cli.StringFlag{
Name: "peer",
},
cli.Uint64Flag{
Name: "type",
},
cli.StringFlag{
Name: "data",
},
},
Action: actionDecorator(sendCustom),
}
func sendCustom(ctx *cli.Context) error {
ctxc := getContext()
client, cleanUp := getClient(ctx)
defer cleanUp()
peer, err := hex.DecodeString(ctx.String("peer"))
if err != nil {
return err
}
msgType := ctx.Uint64("type")
data, err := hex.DecodeString(ctx.String("data"))
if err != nil {
return err
}
_, err = client.SendCustomMessage(
ctxc,
&lnrpc.SendCustomMessageRequest{
Peer: peer,
Type: uint32(msgType),
Data: data,
},
)
return err
}
var subscribeCustomCommand = cli.Command{
Name: "subscribecustom",
Action: actionDecorator(subscribeCustom),
}
func subscribeCustom(ctx *cli.Context) error {
ctxc := getContext()
client, cleanUp := getClient(ctx)
defer cleanUp()
stream, err := client.SubscribeCustomMessages(
ctxc,
&lnrpc.SubscribeCustomMessagesRequest{},
)
if err != nil {
return err
}
for {
msg, err := stream.Recv()
if err != nil {
return err
}
fmt.Printf("Received from peer %x: type=%d, data=%x\n",
msg.Peer, msg.Type, msg.Data)
}
}

View File

@@ -385,6 +385,8 @@ func main() {
profileSubCommand, profileSubCommand,
getStateCommand, getStateCommand,
deletePaymentsCommand, deletePaymentsCommand,
sendCustomCommand,
subscribeCustomCommand,
} }
// Add any extra commands determined by build flags. // Add any extra commands determined by build flags.

View File

@@ -212,7 +212,23 @@ If you use a strange system or changed group membership of the group running LND
you may want to check your system to see if it introduces additional risk for you may want to check your system to see if it introduces additional risk for
you. you.
## Safety ## Custom peer messages
Lightning nodes have a connection to each of their peers for exchanging
messages. In regular operation, these messages coordinate processes such as
channel opening and payment forwarding.
The lightning spec however also defines a custom range (>= 32768) for
experimental and application-specific peer messages.
With this release, [custom peer message
exchange](https://github.com/lightningnetwork/lnd/pull/5346) is added to open up
a range of new possibilities. Custom peer messages allow the lightning protocol
with its transport mechanisms (including tor) and public key authentication to
be leveraged for application-level communication. Note that peers exchange these
messages directly. There is no routing/path finding involved.
# Safety
* Locally force closed channels are now [kept in the channel.backup file until * Locally force closed channels are now [kept in the channel.backup file until
their time lock has fully matured](https://github.com/lightningnetwork/lnd/pull/5528). their time lock has fully matured](https://github.com/lightningnetwork/lnd/pull/5528).
@@ -511,6 +527,7 @@ change](https://github.com/lightningnetwork/lnd/pull/5613).
* Hampus Sjöberg * Hampus Sjöberg
* Harsha Goli * Harsha Goli
* Jesse de Wit * Jesse de Wit
* Joost Jager
* Martin Habovstiak * Martin Habovstiak
* Naveen Srinivasan * Naveen Srinivasan
* Oliver Gugger * Oliver Gugger

File diff suppressed because it is too large Load Diff

View File

@@ -2303,6 +2303,57 @@ func request_Lightning_RegisterRPCMiddleware_0(ctx context.Context, marshaler ru
return stream, metadata, nil return stream, metadata, nil
} }
func request_Lightning_SendCustomMessage_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq SendCustomMessageRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.SendCustomMessage(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Lightning_SendCustomMessage_0(ctx context.Context, marshaler runtime.Marshaler, server LightningServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq SendCustomMessageRequest
var metadata runtime.ServerMetadata
newReader, berr := utilities.IOReaderFactory(req.Body)
if berr != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr)
}
if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.SendCustomMessage(ctx, &protoReq)
return msg, metadata, err
}
func request_Lightning_SubscribeCustomMessages_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (Lightning_SubscribeCustomMessagesClient, runtime.ServerMetadata, error) {
var protoReq SubscribeCustomMessagesRequest
var metadata runtime.ServerMetadata
stream, err := client.SubscribeCustomMessages(ctx, &protoReq)
if err != nil {
return nil, metadata, err
}
header, err := stream.Header()
if err != nil {
return nil, metadata, err
}
metadata.HeaderMD = header
return stream, metadata, nil
}
// RegisterLightningHandlerServer registers the http handlers for service Lightning to "mux". // RegisterLightningHandlerServer registers the http handlers for service Lightning to "mux".
// UnaryRPC :call LightningServer directly. // UnaryRPC :call LightningServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
@@ -3559,6 +3610,36 @@ func RegisterLightningHandlerServer(ctx context.Context, mux *runtime.ServeMux,
return return
}) })
mux.Handle("POST", pattern_Lightning_SendCustomMessage_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/lnrpc.Lightning/SendCustomMessage", runtime.WithHTTPPathPattern("/v1/custommessage"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Lightning_SendCustomMessage_0(rctx, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Lightning_SendCustomMessage_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Lightning_SubscribeCustomMessages_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport")
_, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
})
return nil return nil
} }
@@ -4840,6 +4921,46 @@ func RegisterLightningHandlerClient(ctx context.Context, mux *runtime.ServeMux,
}) })
mux.Handle("POST", pattern_Lightning_SendCustomMessage_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req, "/lnrpc.Lightning/SendCustomMessage", runtime.WithHTTPPathPattern("/v1/custommessage"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Lightning_SendCustomMessage_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Lightning_SendCustomMessage_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Lightning_SubscribeCustomMessages_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, mux, req, "/lnrpc.Lightning/SubscribeCustomMessages", runtime.WithHTTPPathPattern("/v1/custommessage/subscribe"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Lightning_SubscribeCustomMessages_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_Lightning_SubscribeCustomMessages_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...)
})
return nil return nil
} }
@@ -4967,6 +5088,10 @@ var (
pattern_Lightning_CheckMacaroonPermissions_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "macaroon", "checkpermissions"}, "")) pattern_Lightning_CheckMacaroonPermissions_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "macaroon", "checkpermissions"}, ""))
pattern_Lightning_RegisterRPCMiddleware_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "middleware"}, "")) pattern_Lightning_RegisterRPCMiddleware_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "middleware"}, ""))
pattern_Lightning_SendCustomMessage_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "custommessage"}, ""))
pattern_Lightning_SubscribeCustomMessages_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "custommessage", "subscribe"}, ""))
) )
var ( var (
@@ -5093,4 +5218,8 @@ var (
forward_Lightning_CheckMacaroonPermissions_0 = runtime.ForwardResponseMessage forward_Lightning_CheckMacaroonPermissions_0 = runtime.ForwardResponseMessage
forward_Lightning_RegisterRPCMiddleware_0 = runtime.ForwardResponseStream forward_Lightning_RegisterRPCMiddleware_0 = runtime.ForwardResponseStream
forward_Lightning_SendCustomMessage_0 = runtime.ForwardResponseMessage
forward_Lightning_SubscribeCustomMessages_0 = runtime.ForwardResponseStream
) )

View File

@@ -1633,4 +1633,71 @@ func RegisterLightningJSONCallbacks(registry map[string]func(ctx context.Context
} }
callback(string(respBytes), nil) callback(string(respBytes), nil)
} }
registry["lnrpc.Lightning.SendCustomMessage"] = func(ctx context.Context,
conn *grpc.ClientConn, reqJSON string, callback func(string, error)) {
req := &SendCustomMessageRequest{}
err := marshaler.Unmarshal([]byte(reqJSON), req)
if err != nil {
callback("", err)
return
}
client := NewLightningClient(conn)
resp, err := client.SendCustomMessage(ctx, req)
if err != nil {
callback("", err)
return
}
respBytes, err := marshaler.Marshal(resp)
if err != nil {
callback("", err)
return
}
callback(string(respBytes), nil)
}
registry["lnrpc.Lightning.SubscribeCustomMessages"] = func(ctx context.Context,
conn *grpc.ClientConn, reqJSON string, callback func(string, error)) {
req := &SubscribeCustomMessagesRequest{}
err := marshaler.Unmarshal([]byte(reqJSON), req)
if err != nil {
callback("", err)
return
}
client := NewLightningClient(conn)
stream, err := client.SubscribeCustomMessages(ctx, req)
if err != nil {
callback("", err)
return
}
go func() {
for {
select {
case <-stream.Context().Done():
callback("", stream.Context().Err())
return
default:
}
resp, err := stream.Recv()
if err != nil {
callback("", err)
return
}
respBytes, err := marshaler.Marshal(resp)
if err != nil {
callback("", err)
return
}
callback(string(respBytes), nil)
}
}()
}
} }

View File

@@ -557,6 +557,47 @@ service Lightning {
*/ */
rpc RegisterRPCMiddleware (stream RPCMiddlewareResponse) rpc RegisterRPCMiddleware (stream RPCMiddlewareResponse)
returns (stream RPCMiddlewareRequest); returns (stream RPCMiddlewareRequest);
/* lncli: `sendcustom`
SendCustomMessage sends a custom peer message.
*/
rpc SendCustomMessage (SendCustomMessageRequest)
returns (SendCustomMessageResponse);
/* lncli: `subscribecustom`
SubscribeCustomMessages subscribes to a stream of incoming custom peer
messages.
*/
rpc SubscribeCustomMessages (SubscribeCustomMessagesRequest)
returns (stream CustomMessage);
}
message SubscribeCustomMessagesRequest {
}
message CustomMessage {
// Peer from which the message originates
bytes peer = 1;
// Message type. This value will be in the custom range (>= 32768).
uint32 type = 2;
// Raw message data
bytes data = 3;
}
message SendCustomMessageRequest {
// Peer to send the message to
bytes peer = 1;
// Message type. This value needs to be in the custom range (>= 32768).
uint32 type = 2;
// Raw message data.
bytes data = 3;
}
message SendCustomMessageResponse {
} }
message Utxo { message Utxo {

View File

@@ -851,6 +851,71 @@
] ]
} }
}, },
"/v1/custommessage": {
"post": {
"summary": "lncli: `sendcustom`\nSendCustomMessage sends a custom peer message.",
"operationId": "Lightning_SendCustomMessage",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/lnrpcSendCustomMessageResponse"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/rpcStatus"
}
}
},
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/lnrpcSendCustomMessageRequest"
}
}
],
"tags": [
"Lightning"
]
}
},
"/v1/custommessage/subscribe": {
"get": {
"summary": "lncli: `subscribecustom`\nSubscribeCustomMessages subscribes to a stream of incoming custom peer\nmessages.",
"operationId": "Lightning_SubscribeCustomMessages",
"responses": {
"200": {
"description": "A successful response.(streaming responses)",
"schema": {
"type": "object",
"properties": {
"result": {
"$ref": "#/definitions/lnrpcCustomMessage"
},
"error": {
"$ref": "#/definitions/rpcStatus"
}
},
"title": "Stream result of lnrpcCustomMessage"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/rpcStatus"
}
}
},
"tags": [
"Lightning"
]
}
},
"/v1/debuglevel": { "/v1/debuglevel": {
"post": { "post": {
"summary": "lncli: `debuglevel`\nDebugLevel allows a caller to programmatically set the logging verbosity of\nlnd. The logging can be targeted according to a coarse daemon-wide logging\nlevel, or in a granular fashion to specify the logging for a target\nsub-system.", "summary": "lncli: `debuglevel`\nDebugLevel allows a caller to programmatically set the logging verbosity of\nlnd. The logging can be targeted according to a coarse daemon-wide logging\nlevel, or in a granular fashion to specify the logging for a target\nsub-system.",
@@ -1702,17 +1767,6 @@
} }
} }
}, },
"parameters": [
{
"name": "body",
"description": " (streaming inputs)",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/lnrpcRPCMiddlewareResponse"
}
}
],
"tags": [ "tags": [
"Lightning" "Lightning"
] ]
@@ -3808,6 +3862,26 @@
"lnrpcConnectPeerResponse": { "lnrpcConnectPeerResponse": {
"type": "object" "type": "object"
}, },
"lnrpcCustomMessage": {
"type": "object",
"properties": {
"peer": {
"type": "string",
"format": "byte",
"title": "Peer from which the message originates"
},
"type": {
"type": "integer",
"format": "int64",
"description": "Message type. This value will be in the custom range (\u003e= 32768)."
},
"data": {
"type": "string",
"format": "byte",
"title": "Raw message data"
}
}
},
"lnrpcDebugLevelRequest": { "lnrpcDebugLevelRequest": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -5766,24 +5840,6 @@
} }
} }
}, },
"lnrpcRPCMiddlewareResponse": {
"type": "object",
"properties": {
"request_id": {
"type": "string",
"format": "uint64",
"description": "The unique ID of the intercepted request that this response refers to. Must\nalways be set when giving feedback to an intercept but is ignored for the\ninitial registration message."
},
"register": {
"$ref": "#/definitions/lnrpcMiddlewareRegistration",
"title": "The registration message identifies the middleware that's being\nregistered in lnd. The registration message must be sent immediately\nafter initiating the RegisterRpcMiddleware stream, otherwise lnd will\ntime out the attempt and terminate the request. NOTE: The middleware\nwill only receive interception messages for requests that contain a\nmacaroon with the custom caveat that the middleware declares it is\nresponsible for handling in the registration message! As a security\nmeasure, _no_ middleware can intercept requests made with _unencumbered_\nmacaroons!"
},
"feedback": {
"$ref": "#/definitions/lnrpcInterceptFeedback",
"description": "The middleware received an interception request and gives feedback to\nit. The request_id indicates what message the feedback refers to."
}
}
},
"lnrpcReadyForPsbtFunding": { "lnrpcReadyForPsbtFunding": {
"type": "object", "type": "object",
"properties": { "properties": {
@@ -6008,6 +6064,29 @@
} }
} }
}, },
"lnrpcSendCustomMessageRequest": {
"type": "object",
"properties": {
"peer": {
"type": "string",
"format": "byte",
"title": "Peer to send the message to"
},
"type": {
"type": "integer",
"format": "int64",
"description": "Message type. This value needs to be in the custom range (\u003e= 32768)."
},
"data": {
"type": "string",
"format": "byte",
"description": "Raw message data."
}
}
},
"lnrpcSendCustomMessageResponse": {
"type": "object"
},
"lnrpcSendManyRequest": { "lnrpcSendManyRequest": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -155,4 +155,8 @@ http:
body: "*" body: "*"
- selector: lnrpc.Lightning.RegisterRPCMiddleware - selector: lnrpc.Lightning.RegisterRPCMiddleware
post: "/v1/middleware" post: "/v1/middleware"
- selector: lnrpc.Lightning.SendCustomMessage
post: "/v1/custommessage"
body: "*" body: "*"
- selector: lnrpc.Lightning.SubscribeCustomMessages
get: "/v1/custommessage/subscribe"

View File

@@ -403,6 +403,13 @@ type LightningClient interface {
//allowed to modify any responses. As a security measure, _no_ middleware can //allowed to modify any responses. As a security measure, _no_ middleware can
//modify responses for requests made with _unencumbered_ macaroons! //modify responses for requests made with _unencumbered_ macaroons!
RegisterRPCMiddleware(ctx context.Context, opts ...grpc.CallOption) (Lightning_RegisterRPCMiddlewareClient, error) RegisterRPCMiddleware(ctx context.Context, opts ...grpc.CallOption) (Lightning_RegisterRPCMiddlewareClient, error)
// lncli: `sendcustom`
//SendCustomMessage sends a custom peer message.
SendCustomMessage(ctx context.Context, in *SendCustomMessageRequest, opts ...grpc.CallOption) (*SendCustomMessageResponse, error)
// lncli: `subscribecustom`
//SubscribeCustomMessages subscribes to a stream of incoming custom peer
//messages.
SubscribeCustomMessages(ctx context.Context, in *SubscribeCustomMessagesRequest, opts ...grpc.CallOption) (Lightning_SubscribeCustomMessagesClient, error)
} }
type lightningClient struct { type lightningClient struct {
@@ -1254,6 +1261,47 @@ func (x *lightningRegisterRPCMiddlewareClient) Recv() (*RPCMiddlewareRequest, er
return m, nil return m, nil
} }
func (c *lightningClient) SendCustomMessage(ctx context.Context, in *SendCustomMessageRequest, opts ...grpc.CallOption) (*SendCustomMessageResponse, error) {
out := new(SendCustomMessageResponse)
err := c.cc.Invoke(ctx, "/lnrpc.Lightning/SendCustomMessage", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *lightningClient) SubscribeCustomMessages(ctx context.Context, in *SubscribeCustomMessagesRequest, opts ...grpc.CallOption) (Lightning_SubscribeCustomMessagesClient, error) {
stream, err := c.cc.NewStream(ctx, &Lightning_ServiceDesc.Streams[12], "/lnrpc.Lightning/SubscribeCustomMessages", opts...)
if err != nil {
return nil, err
}
x := &lightningSubscribeCustomMessagesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Lightning_SubscribeCustomMessagesClient interface {
Recv() (*CustomMessage, error)
grpc.ClientStream
}
type lightningSubscribeCustomMessagesClient struct {
grpc.ClientStream
}
func (x *lightningSubscribeCustomMessagesClient) Recv() (*CustomMessage, error) {
m := new(CustomMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// LightningServer is the server API for Lightning service. // LightningServer is the server API for Lightning service.
// All implementations must embed UnimplementedLightningServer // All implementations must embed UnimplementedLightningServer
// for forward compatibility // for forward compatibility
@@ -1643,6 +1691,13 @@ type LightningServer interface {
//allowed to modify any responses. As a security measure, _no_ middleware can //allowed to modify any responses. As a security measure, _no_ middleware can
//modify responses for requests made with _unencumbered_ macaroons! //modify responses for requests made with _unencumbered_ macaroons!
RegisterRPCMiddleware(Lightning_RegisterRPCMiddlewareServer) error RegisterRPCMiddleware(Lightning_RegisterRPCMiddlewareServer) error
// lncli: `sendcustom`
//SendCustomMessage sends a custom peer message.
SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error)
// lncli: `subscribecustom`
//SubscribeCustomMessages subscribes to a stream of incoming custom peer
//messages.
SubscribeCustomMessages(*SubscribeCustomMessagesRequest, Lightning_SubscribeCustomMessagesServer) error
mustEmbedUnimplementedLightningServer() mustEmbedUnimplementedLightningServer()
} }
@@ -1839,6 +1894,12 @@ func (UnimplementedLightningServer) CheckMacaroonPermissions(context.Context, *C
func (UnimplementedLightningServer) RegisterRPCMiddleware(Lightning_RegisterRPCMiddlewareServer) error { func (UnimplementedLightningServer) RegisterRPCMiddleware(Lightning_RegisterRPCMiddlewareServer) error {
return status.Errorf(codes.Unimplemented, "method RegisterRPCMiddleware not implemented") return status.Errorf(codes.Unimplemented, "method RegisterRPCMiddleware not implemented")
} }
func (UnimplementedLightningServer) SendCustomMessage(context.Context, *SendCustomMessageRequest) (*SendCustomMessageResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendCustomMessage not implemented")
}
func (UnimplementedLightningServer) SubscribeCustomMessages(*SubscribeCustomMessagesRequest, Lightning_SubscribeCustomMessagesServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeCustomMessages not implemented")
}
func (UnimplementedLightningServer) mustEmbedUnimplementedLightningServer() {} func (UnimplementedLightningServer) mustEmbedUnimplementedLightningServer() {}
// UnsafeLightningServer may be embedded to opt out of forward compatibility for this service. // UnsafeLightningServer may be embedded to opt out of forward compatibility for this service.
@@ -3042,6 +3103,45 @@ func (x *lightningRegisterRPCMiddlewareServer) Recv() (*RPCMiddlewareResponse, e
return m, nil return m, nil
} }
func _Lightning_SendCustomMessage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SendCustomMessageRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LightningServer).SendCustomMessage(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/lnrpc.Lightning/SendCustomMessage",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LightningServer).SendCustomMessage(ctx, req.(*SendCustomMessageRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Lightning_SubscribeCustomMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeCustomMessagesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(LightningServer).SubscribeCustomMessages(m, &lightningSubscribeCustomMessagesServer{stream})
}
type Lightning_SubscribeCustomMessagesServer interface {
Send(*CustomMessage) error
grpc.ServerStream
}
type lightningSubscribeCustomMessagesServer struct {
grpc.ServerStream
}
func (x *lightningSubscribeCustomMessagesServer) Send(m *CustomMessage) error {
return x.ServerStream.SendMsg(m)
}
// Lightning_ServiceDesc is the grpc.ServiceDesc for Lightning service. // Lightning_ServiceDesc is the grpc.ServiceDesc for Lightning service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@@ -3253,6 +3353,10 @@ var Lightning_ServiceDesc = grpc.ServiceDesc{
MethodName: "CheckMacaroonPermissions", MethodName: "CheckMacaroonPermissions",
Handler: _Lightning_CheckMacaroonPermissions_Handler, Handler: _Lightning_CheckMacaroonPermissions_Handler,
}, },
{
MethodName: "SendCustomMessage",
Handler: _Lightning_SendCustomMessage_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
@@ -3319,6 +3423,11 @@ var Lightning_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },
{
StreamName: "SubscribeCustomMessages",
Handler: _Lightning_SubscribeCustomMessages_Handler,
ServerStreams: true,
},
}, },
Metadata: "lightning.proto", Metadata: "lightning.proto",
} }

65
lnwire/custom.go Normal file
View File

@@ -0,0 +1,65 @@
package lnwire
import (
"bytes"
"errors"
"io"
)
// CustomTypeStart is the start of the custom type range for peer messages as
// defined in BOLT 01.
var CustomTypeStart MessageType = 32768
// Custom represents an application-defined wire message.
type Custom struct {
Type MessageType
Data []byte
}
// A compile time check to ensure FundingCreated implements the lnwire.Message
// interface.
var _ Message = (*Custom)(nil)
// NewCustom instanties a new custom message.
func NewCustom(msgType MessageType, data []byte) (*Custom, error) {
if msgType < CustomTypeStart {
return nil, errors.New("msg type not in custom range")
}
return &Custom{
Type: msgType,
Data: data,
}, nil
}
// Encode serializes the target Custom message into the passed io.Writer
// implementation.
//
// This is part of the lnwire.Message interface.
func (c *Custom) Encode(b *bytes.Buffer, pver uint32) error {
_, err := b.Write(c.Data)
return err
}
// Decode deserializes the serialized Custom message stored in the passed
// io.Reader into the target Custom message.
//
// This is part of the lnwire.Message interface.
func (c *Custom) Decode(r io.Reader, pver uint32) error {
var b bytes.Buffer
if _, err := io.Copy(&b, r); err != nil {
return err
}
c.Data = b.Bytes()
return nil
}
// MsgType returns the uint32 code which uniquely identifies this message as a
// Custom message on the wire.
//
// This is part of the lnwire.Message interface.
func (c *Custom) MsgType() MessageType {
return c.Type
}

View File

@@ -240,7 +240,7 @@ func TestMaxOutPointIndex(t *testing.T) {
func TestEmptyMessageUnknownType(t *testing.T) { func TestEmptyMessageUnknownType(t *testing.T) {
t.Parallel() t.Parallel()
fakeType := MessageType(math.MaxUint16) fakeType := CustomTypeStart - 1
if _, err := makeEmptyMessage(fakeType); err == nil { if _, err := makeEmptyMessage(fakeType); err == nil {
t.Fatalf("should not be able to make an empty message of an " + t.Fatalf("should not be able to make an empty message of an " +
"unknown type") "unknown type")

View File

@@ -233,8 +233,13 @@ func makeEmptyMessage(msgType MessageType) (Message, error) {
case MsgGossipTimestampRange: case MsgGossipTimestampRange:
msg = &GossipTimestampRange{} msg = &GossipTimestampRange{}
default: default:
if msgType < CustomTypeStart {
return nil, &UnknownMessage{msgType} return nil, &UnknownMessage{msgType}
} }
msg = &Custom{
Type: msgType,
}
}
return msg, nil return msg, nil
} }

View File

@@ -94,6 +94,11 @@ type newChannelMsg struct {
err chan error err chan error
} }
type customMsg struct {
peer [33]byte
msg lnwire.Custom
}
// closeMsg is a wrapper struct around any wire messages that deal with the // closeMsg is a wrapper struct around any wire messages that deal with the
// cooperative channel closure negotiation process. This struct includes the // cooperative channel closure negotiation process. This struct includes the
// raw channel ID targeted along with the original message. // raw channel ID targeted along with the original message.
@@ -318,6 +323,10 @@ type Config struct {
// that is accumulated before signing a new commitment. // that is accumulated before signing a new commitment.
ChannelCommitBatchSize uint32 ChannelCommitBatchSize uint32
// HandleCustomMessage is called whenever a custom message is received
// from the peer.
HandleCustomMessage func(peer [33]byte, msg *lnwire.Custom) error
// Quit is the server's quit channel. If this is closed, we halt operation. // Quit is the server's quit channel. If this is closed, we halt operation.
Quit chan struct{} Quit chan struct{}
} }
@@ -1449,6 +1458,13 @@ out:
discStream.AddMsg(msg) discStream.AddMsg(msg)
case *lnwire.Custom:
err := p.handleCustomMessage(msg)
if err != nil {
p.storeError(err)
peerLog.Errorf("peer: %v, %v", p, err)
}
default: default:
// If the message we received is unknown to us, store // If the message we received is unknown to us, store
// the type to track the failure. // the type to track the failure.
@@ -1486,6 +1502,17 @@ out:
peerLog.Tracef("readHandler for peer %v done", p) peerLog.Tracef("readHandler for peer %v done", p)
} }
// handleCustomMessage handles the given custom message if a handler is
// registered.
func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
if p.cfg.HandleCustomMessage == nil {
return fmt.Errorf("no custom message handler for "+
"message type %v", uint16(msg.MsgType()))
}
return p.cfg.HandleCustomMessage(p.PubKey(), msg)
}
// isActiveChannel returns true if the provided channel id is active, otherwise // isActiveChannel returns true if the provided channel id is active, otherwise
// returns false. // returns false.
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool { func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
@@ -1686,6 +1713,8 @@ func messageSummary(msg lnwire.Message) string {
time.Unix(int64(msg.FirstTimestamp), 0), time.Unix(int64(msg.FirstTimestamp), 0),
msg.TimestampRange) msg.TimestampRange)
case *lnwire.Custom:
return fmt.Sprintf("type=%d", msg.Type)
} }
return "" return ""
@@ -1714,8 +1743,15 @@ func (p *Brontide) logWireMessage(msg lnwire.Message, read bool) {
preposition = "from" preposition = "from"
} }
var msgType string
if msg.MsgType() < lnwire.CustomTypeStart {
msgType = msg.MsgType().String()
} else {
msgType = "custom"
}
return fmt.Sprintf("%v %v%s %v %s", summaryPrefix, return fmt.Sprintf("%v %v%s %v %s", summaryPrefix,
msg.MsgType(), summary, preposition, p) msgType, summary, preposition, p)
})) }))
switch m := msg.(type) { switch m := msg.(type) {

View File

@@ -2,9 +2,11 @@ package peer
import ( import (
"bytes" "bytes"
"io/ioutil"
"testing" "testing"
"time" "time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
@@ -17,6 +19,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/pool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -1031,3 +1034,122 @@ func genScript(t *testing.T, address string) lnwire.DeliveryAddress {
return script return script
} }
// TestPeerCustomMessage tests custom message exchange between peers.
func TestPeerCustomMessage(t *testing.T) {
t.Parallel()
// Set up node Alice.
alicePath, err := ioutil.TempDir("", "alicedb")
require.NoError(t, err)
dbAlice, err := channeldb.Open(alicePath)
require.NoError(t, err)
aliceKey, err := btcec.NewPrivateKey(btcec.S256())
require.NoError(t, err)
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
mockConn := newMockConn(t, 1)
receivedCustomChan := make(chan *customMsg)
remoteKey := [33]byte{8}
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
alicePeer := NewBrontide(Config{
PubKeyBytes: remoteKey,
ChannelDB: dbAlice.ChannelStateDB(),
Addr: &lnwire.NetAddress{
IdentityKey: aliceKey.PubKey(),
},
PrunePersistentPeerConnection: func([33]byte) {},
Features: lnwire.EmptyFeatureVector(),
LegacyFeatures: lnwire.EmptyFeatureVector(),
WritePool: writePool,
ReadPool: readPool,
Conn: mockConn,
ChainNotifier: notifier,
HandleCustomMessage: func(
peer [33]byte, msg *lnwire.Custom) error {
receivedCustomChan <- &customMsg{
peer: peer,
msg: *msg,
}
return nil
},
})
// Set up the init sequence.
go func() {
// Read init message.
<-mockConn.writtenMessages
// Write the init reply message.
initReplyMsg := lnwire.NewInitMessage(
lnwire.NewRawFeatureVector(
lnwire.DataLossProtectRequired,
),
lnwire.NewRawFeatureVector(),
)
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
assert.NoError(t, err)
mockConn.readMessages <- b.Bytes()
}()
// Start the peer.
require.NoError(t, alicePeer.Start())
// Send a custom message.
customMsg, err := lnwire.NewCustom(
lnwire.MessageType(40000), []byte{1, 2, 3},
)
require.NoError(t, err)
require.NoError(t, alicePeer.SendMessageLazy(false, customMsg))
// Verify that it is passed down to the noise layer correctly.
writtenMsg := <-mockConn.writtenMessages
require.Equal(t, []byte{0x9c, 0x40, 0x1, 0x2, 0x3}, writtenMsg)
// Receive a custom message.
receivedCustomMsg, err := lnwire.NewCustom(
lnwire.MessageType(40001), []byte{4, 5, 6},
)
require.NoError(t, err)
receivedData := []byte{0x9c, 0x41, 0x4, 0x5, 0x6}
mockConn.readMessages <- receivedData
// Verify that it is propagated up to the custom message handler.
receivedCustom := <-receivedCustomChan
require.Equal(t, remoteKey, receivedCustom.peer)
require.Equal(t, receivedCustomMsg, &receivedCustom.msg)
}

View File

@@ -460,12 +460,16 @@ type mockMessageConn struct {
// writtenMessages is a channel that our mock pushes written messages into. // writtenMessages is a channel that our mock pushes written messages into.
writtenMessages chan []byte writtenMessages chan []byte
readMessages chan []byte
curReadMessage []byte
} }
func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn { func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn {
return &mockMessageConn{ return &mockMessageConn{
t: t, t: t,
writtenMessages: make(chan []byte, expectedMessages), writtenMessages: make(chan []byte, expectedMessages),
readMessages: make(chan []byte, 1),
} }
} }
@@ -502,3 +506,16 @@ func (m *mockMessageConn) assertWrite(expected []byte) {
m.t.Fatalf("timeout waiting for write: %v", expected) m.t.Fatalf("timeout waiting for write: %v", expected)
} }
} }
func (m *mockMessageConn) SetReadDeadline(t time.Time) error {
return nil
}
func (m *mockMessageConn) ReadNextHeader() (uint32, error) {
m.curReadMessage = <-m.readMessages
return uint32(len(m.curReadMessage)), nil
}
func (m *mockMessageConn) ReadNextBody(buf []byte) ([]byte, error) {
return m.curReadMessage, nil
}

View File

@@ -568,6 +568,14 @@ func MainRPCServerPermissions() map[string][]bakery.Op {
Entity: "macaroon", Entity: "macaroon",
Action: "write", Action: "write",
}}, }},
"/lnrpc.Lightning/SendCustomMessage": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/SubscribeCustomMessages": {{
Entity: "offchain",
Action: "read",
}},
} }
} }
@@ -7326,3 +7334,59 @@ func (r *rpcServer) RegisterRPCMiddleware(
return middleware.Run() return middleware.Run()
} }
// SendCustomMessage sends a custom peer message.
func (r *rpcServer) SendCustomMessage(ctx context.Context, req *lnrpc.SendCustomMessageRequest) (
*lnrpc.SendCustomMessageResponse, error) {
peer, err := route.NewVertexFromBytes(req.Peer)
if err != nil {
return nil, err
}
err = r.server.SendCustomMessage(
peer, lnwire.MessageType(req.Type), req.Data,
)
switch {
case err == ErrPeerNotConnected:
return nil, status.Error(codes.NotFound, err.Error())
case err != nil:
return nil, err
}
return &lnrpc.SendCustomMessageResponse{}, nil
}
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
// messages.
func (r *rpcServer) SubscribeCustomMessages(req *lnrpc.SubscribeCustomMessagesRequest,
server lnrpc.Lightning_SubscribeCustomMessagesServer) error {
client, err := r.server.SubscribeCustomMessages()
if err != nil {
return err
}
defer client.Cancel()
for {
select {
case <-client.Quit():
return errors.New("shutdown")
case <-server.Context().Done():
return server.Context().Err()
case update := <-client.Updates():
customMsg := update.(*CustomMessage)
err := server.Send(&lnrpc.CustomMessage{
Peer: customMsg.Peer[:],
Data: customMsg.Msg.Data,
Type: uint32(customMsg.Msg.Type),
})
if err != nil {
return err
}
}
}
}

View File

@@ -307,6 +307,8 @@ type server struct {
// livelinessMonitor monitors that lnd has access to critical resources. // livelinessMonitor monitors that lnd has access to critical resources.
livelinessMonitor *healthcheck.Monitor livelinessMonitor *healthcheck.Monitor
customMessageServer *subscribe.Server
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@@ -395,6 +397,15 @@ func (s *server) updatePersistentPeerAddrs() error {
return nil return nil
} }
// CustomMessage is a custom message that is received from a peer.
type CustomMessage struct {
// Peer is the peer pubkey
Peer [33]byte
// Msg is the custom wire message.
Msg *lnwire.Custom
}
// parseAddr parses an address from its string format to a net.Addr. // parseAddr parses an address from its string format to a net.Addr.
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) { func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
var ( var (
@@ -568,6 +579,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}), peerDisconnectedListeners: make(map[string][]chan<- struct{}),
customMessageServer: subscribe.NewServer(),
featureMgr: featureMgr, featureMgr: featureMgr,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@@ -1637,6 +1650,11 @@ func (s *server) Start() error {
cleanup := cleaner{} cleanup := cleaner{}
s.start.Do(func() { s.start.Do(func() {
if err := s.customMessageServer.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.customMessageServer.Stop)
if s.hostAnn != nil { if s.hostAnn != nil {
if err := s.hostAnn.Start(); err != nil { if err := s.hostAnn.Start(); err != nil {
@@ -3336,6 +3354,24 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
delete(s.persistentConnReqs, pubStr) delete(s.persistentConnReqs, pubStr)
} }
// handleCustomMessage dispatches an incoming custom peers message to
// subscribers.
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
srvrLog.Debugf("Custom message received: peer=%x, type=%d",
peer, msg.Type)
return s.customMessageServer.SendUpdate(&CustomMessage{
Peer: peer,
Msg: msg,
})
}
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
// messages.
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
return s.customMessageServer.Subscribe()
}
// peerConnected is a function that handles initialization a newly connected // peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and // peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly. The inbound // starting all the goroutines the peer needs to function properly. The inbound
@@ -3431,6 +3467,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
ChannelCommitInterval: s.cfg.ChannelCommitInterval, ChannelCommitInterval: s.cfg.ChannelCommitInterval,
ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize, ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
HandleCustomMessage: s.handleCustomMessage,
Quit: s.quit, Quit: s.quit,
} }
@@ -4179,6 +4216,35 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
} }
} }
// SendCustomMessage sends a custom message to the peer with the specified
// pubkey.
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
data []byte) error {
peer, err := s.FindPeerByPubStr(string(peerPub[:]))
if err != nil {
return err
}
// We'll wait until the peer is active.
select {
case <-peer.ActiveSignal():
case <-peer.QuitSignal():
return fmt.Errorf("peer %x disconnected", peerPub)
case <-s.quit:
return ErrServerShuttingDown
}
msg, err := lnwire.NewCustom(msgType, data)
if err != nil {
return err
}
// Send the message as low-priority. For now we assume that all
// application-defined message are low priority.
return peer.SendMessageLazy(true, msg)
}
// newSweepPkScriptGen creates closure that generates a new public key script // newSweepPkScriptGen creates closure that generates a new public key script
// which should be used to sweep any funds into the on-chain wallet. // which should be used to sweep any funds into the on-chain wallet.
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash // Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash