mirror of
https://github.com/lightninglabs/aperture.git
synced 2025-12-17 09:04:19 +01:00
244 lines
6.5 KiB
Go
244 lines
6.5 KiB
Go
package aperture
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
|
|
"github.com/lightningnetwork/lnd/build"
|
|
"github.com/lightningnetwork/lnd/lntest/wait"
|
|
"github.com/lightningnetwork/lnd/signal"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var (
|
|
testApertureAddress = "localhost:8082"
|
|
testSID = streamID{1, 2, 3}
|
|
testStreamDesc = &hashmailrpc.CipherBoxDesc{
|
|
StreamId: testSID[:],
|
|
}
|
|
testMessage = []byte("I'm a message!")
|
|
apertureStartTimeout = 3 * time.Second
|
|
)
|
|
|
|
func TestHashMailServerReturnStream(t *testing.T) {
|
|
ctxb := context.Background()
|
|
|
|
setupAperture(t)
|
|
|
|
// Create a client and connect it to the server.
|
|
conn, err := grpc.Dial(testApertureAddress, grpc.WithInsecure())
|
|
require.NoError(t, err)
|
|
client := hashmailrpc.NewHashMailClient(conn)
|
|
|
|
// We'll create a new cipher box that we're going to subscribe to
|
|
// multiple times to check disconnecting returns the read stream.
|
|
resp, err := client.NewCipherBox(ctxb, &hashmailrpc.CipherBoxAuth{
|
|
Auth: &hashmailrpc.CipherBoxAuth_LndAuth{},
|
|
Desc: testStreamDesc,
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp.GetSuccess())
|
|
|
|
// First we make sure there is something to read on the other end of
|
|
// that stream by writing something to it.
|
|
sendCtx, sendCancel := context.WithCancel(context.Background())
|
|
defer sendCancel()
|
|
|
|
writeStream, err := client.SendStream(sendCtx)
|
|
require.NoError(t, err)
|
|
err = writeStream.Send(&hashmailrpc.CipherBox{
|
|
Desc: testStreamDesc,
|
|
Msg: testMessage,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// We need to wait a bit to make sure the message is really sent.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Connect, wait for the stream to be ready, read something, then
|
|
// disconnect immediately.
|
|
msg, err := readMsgFromStream(t, client)
|
|
require.NoError(t, err)
|
|
require.Equal(t, testMessage, msg.Msg)
|
|
|
|
// Make sure we can connect again immediately and try to read something.
|
|
// There is no message to read before we cancel the request so we expect
|
|
// an EOF error to be returned upon connection close/context cancel.
|
|
_, err = readMsgFromStream(t, client)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "context canceled")
|
|
|
|
// Send then receive yet another message to make sure the stream is
|
|
// still operational.
|
|
testMessage2 := append(testMessage, []byte("test")...)
|
|
err = writeStream.Send(&hashmailrpc.CipherBox{
|
|
Desc: testStreamDesc,
|
|
Msg: testMessage2,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// We need to wait a bit to make sure the message is really sent.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
msg, err = readMsgFromStream(t, client)
|
|
require.NoError(t, err)
|
|
require.Equal(t, testMessage2, msg.Msg)
|
|
|
|
// Clean up the stream now.
|
|
_, err = client.DelCipherBox(ctxb, &hashmailrpc.CipherBoxAuth{
|
|
Auth: &hashmailrpc.CipherBoxAuth_LndAuth{},
|
|
Desc: testStreamDesc,
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestHashMailServerLargeMessage(t *testing.T) {
|
|
ctxb := context.Background()
|
|
|
|
setupAperture(t)
|
|
|
|
// Create a client and connect it to the server.
|
|
conn, err := grpc.Dial(testApertureAddress, grpc.WithInsecure())
|
|
require.NoError(t, err)
|
|
client := hashmailrpc.NewHashMailClient(conn)
|
|
|
|
// We'll create a new cipher box that we're going to subscribe to
|
|
// multiple times to check disconnecting returns the read stream.
|
|
resp, err := client.NewCipherBox(ctxb, &hashmailrpc.CipherBoxAuth{
|
|
Auth: &hashmailrpc.CipherBoxAuth_LndAuth{},
|
|
Desc: testStreamDesc,
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp.GetSuccess())
|
|
|
|
// Let's create a long message and try to send it.
|
|
var largeMessage [512 * DefaultBufSize]byte
|
|
_, err = rand.Read(largeMessage[:])
|
|
require.NoError(t, err)
|
|
|
|
sendCtx, sendCancel := context.WithCancel(context.Background())
|
|
defer sendCancel()
|
|
|
|
writeStream, err := client.SendStream(sendCtx)
|
|
require.NoError(t, err)
|
|
err = writeStream.Send(&hashmailrpc.CipherBox{
|
|
Desc: testStreamDesc,
|
|
Msg: largeMessage[:],
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// We need to wait a bit to make sure the message is really sent.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Connect, wait for the stream to be ready, read something, then
|
|
// disconnect immediately.
|
|
msg, err := readMsgFromStream(t, client)
|
|
require.NoError(t, err)
|
|
require.Equal(t, largeMessage[:], msg.Msg)
|
|
}
|
|
|
|
func setupAperture(t *testing.T) {
|
|
logWriter := build.NewRotatingLogWriter()
|
|
|
|
SetupLoggers(logWriter, signal.Interceptor{})
|
|
|
|
err := build.ParseAndSetDebugLevels("trace,PRXY=warn", logWriter)
|
|
require.NoError(t, err)
|
|
|
|
apertureCfg := &Config{
|
|
Insecure: true,
|
|
ListenAddr: testApertureAddress,
|
|
Authenticator: &AuthConfig{
|
|
Disable: true,
|
|
},
|
|
Etcd: &EtcdConfig{},
|
|
HashMail: &HashMailConfig{
|
|
Enabled: true,
|
|
MessageRate: time.Millisecond,
|
|
MessageBurstAllowance: math.MaxUint32,
|
|
},
|
|
}
|
|
aperture := NewAperture(apertureCfg)
|
|
errChan := make(chan error)
|
|
require.NoError(t, aperture.Start(errChan))
|
|
|
|
// Any error while starting?
|
|
select {
|
|
case err := <-errChan:
|
|
t.Fatalf("error starting aperture: %v", err)
|
|
default:
|
|
}
|
|
|
|
err = wait.NoError(func() error {
|
|
apertureAddr := fmt.Sprintf("http://%s/dummy",
|
|
testApertureAddress)
|
|
|
|
resp, err := http.Get(apertureAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusNotFound {
|
|
return fmt.Errorf("invalid status: %d", resp.StatusCode)
|
|
}
|
|
|
|
return nil
|
|
}, apertureStartTimeout)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func readMsgFromStream(t *testing.T,
|
|
client hashmailrpc.HashMailClient) (*hashmailrpc.CipherBox, error) {
|
|
|
|
ctxc, cancel := context.WithCancel(context.Background())
|
|
readStream, err := client.RecvStream(ctxc, testStreamDesc)
|
|
require.NoError(t, err)
|
|
|
|
// Wait a bit again to make sure the request is actually sent before our
|
|
// context is canceled already again.
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// We'll start a read on the stream in the background.
|
|
var (
|
|
goroutineStarted = make(chan struct{})
|
|
resultChan = make(chan *hashmailrpc.CipherBox)
|
|
errChan = make(chan error)
|
|
)
|
|
go func() {
|
|
close(goroutineStarted)
|
|
box, err := readStream.Recv()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
resultChan <- box
|
|
}()
|
|
|
|
// Give the goroutine a chance to actually run, so block the main thread
|
|
// until it did.
|
|
<-goroutineStarted
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
// Now close and cancel the stream to make sure the server can clean it
|
|
// up and release it.
|
|
require.NoError(t, readStream.CloseSend())
|
|
cancel()
|
|
|
|
// Interpret the result.
|
|
select {
|
|
case err := <-errChan:
|
|
return nil, err
|
|
|
|
case box := <-resultChan:
|
|
return box, nil
|
|
}
|
|
}
|