mirror of
https://github.com/aljazceru/lspd.git
synced 2025-12-19 23:04:22 +01:00
refactor to allow start/stop
This commit is contained in:
@@ -3,7 +3,6 @@ package itest
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@@ -11,7 +10,7 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/breez/lntest"
|
||||
@@ -23,33 +22,112 @@ import (
|
||||
)
|
||||
|
||||
type PostgresContainer struct {
|
||||
id string
|
||||
password string
|
||||
port uint32
|
||||
cli *client.Client
|
||||
id string
|
||||
password string
|
||||
port uint32
|
||||
cli *client.Client
|
||||
logfile string
|
||||
isInitialized bool
|
||||
isStarted bool
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func StartPostgresContainer(t *testing.T, ctx context.Context, logfile string) *PostgresContainer {
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv)
|
||||
lntest.CheckError(t, err)
|
||||
|
||||
image := "postgres:15"
|
||||
_, _, err = cli.ImageInspectWithRaw(ctx, image)
|
||||
func NewPostgresContainer(logfile string) (*PostgresContainer, error) {
|
||||
port, err := lntest.GetPort()
|
||||
if err != nil {
|
||||
if !client.IsErrNotFound(err) {
|
||||
lntest.CheckError(t, err)
|
||||
}
|
||||
|
||||
pullReader, err := cli.ImagePull(ctx, image, types.ImagePullOptions{})
|
||||
lntest.CheckError(t, err)
|
||||
_, err = io.Copy(io.Discard, pullReader)
|
||||
pullReader.Close()
|
||||
lntest.CheckError(t, err)
|
||||
return nil, fmt.Errorf("could not get port: %w", err)
|
||||
}
|
||||
|
||||
port, err := lntest.GetPort()
|
||||
lntest.CheckError(t, err)
|
||||
createResp, err := cli.ContainerCreate(ctx, &container.Config{
|
||||
return &PostgresContainer{
|
||||
password: "pgpassword",
|
||||
port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) Start(ctx context.Context) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
var err error
|
||||
if c.isStarted {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.cli, err = client.NewClientWithOpts(client.FromEnv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create docker client: %w", err)
|
||||
}
|
||||
|
||||
if !c.isInitialized {
|
||||
err := c.initialize(ctx)
|
||||
if err != nil {
|
||||
c.cli.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = c.cli.ContainerStart(ctx, c.id, types.ContainerStartOptions{})
|
||||
if err != nil {
|
||||
c.cli.Close()
|
||||
return fmt.Errorf("failed to start docker container '%s': %w", c.id, err)
|
||||
}
|
||||
c.isStarted = true
|
||||
|
||||
HealthCheck:
|
||||
for {
|
||||
inspect, err := c.cli.ContainerInspect(ctx, c.id)
|
||||
if err != nil {
|
||||
c.cli.ContainerStop(ctx, c.id, nil)
|
||||
c.cli.Close()
|
||||
return fmt.Errorf("failed to inspect container '%s' during healthcheck: %w", c.id, err)
|
||||
}
|
||||
|
||||
status := inspect.State.Health.Status
|
||||
switch status {
|
||||
case "unhealthy":
|
||||
c.cli.ContainerStop(ctx, c.id, nil)
|
||||
c.cli.Close()
|
||||
return fmt.Errorf("container '%s' unhealthy", c.id)
|
||||
case "healthy":
|
||||
for {
|
||||
pgxPool, err := pgxpool.Connect(ctx, c.ConnectionString())
|
||||
if err == nil {
|
||||
pgxPool.Close()
|
||||
break HealthCheck
|
||||
}
|
||||
|
||||
<-time.After(50 * time.Millisecond)
|
||||
}
|
||||
default:
|
||||
<-time.After(200 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
go c.monitorLogs(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) initialize(ctx context.Context) error {
|
||||
image := "postgres:15"
|
||||
_, _, err := c.cli.ImageInspectWithRaw(ctx, image)
|
||||
if err != nil {
|
||||
if !client.IsErrNotFound(err) {
|
||||
return fmt.Errorf("could not find docker image '%s': %w", image, err)
|
||||
}
|
||||
|
||||
pullReader, err := c.cli.ImagePull(ctx, image, types.ImagePullOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to pull docker image '%s': %w", image, err)
|
||||
}
|
||||
defer pullReader.Close()
|
||||
|
||||
_, err = io.Copy(io.Discard, pullReader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download docker image '%s': %w", image, err)
|
||||
}
|
||||
}
|
||||
|
||||
createResp, err := c.cli.ContainerCreate(ctx, &container.Config{
|
||||
Image: image,
|
||||
Cmd: []string{
|
||||
"postgres",
|
||||
@@ -70,7 +148,7 @@ func StartPostgresContainer(t *testing.T, ctx context.Context, logfile string) *
|
||||
}, &container.HostConfig{
|
||||
PortBindings: nat.PortMap{
|
||||
"5432/tcp": []nat.PortBinding{
|
||||
{HostPort: strconv.FormatUint(uint64(port), 10)},
|
||||
{HostPort: strconv.FormatUint(uint64(c.port), 10)},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -78,48 +156,45 @@ func StartPostgresContainer(t *testing.T, ctx context.Context, logfile string) *
|
||||
nil,
|
||||
"",
|
||||
)
|
||||
lntest.CheckError(t, err)
|
||||
|
||||
err = cli.ContainerStart(ctx, createResp.ID, types.ContainerStartOptions{})
|
||||
lntest.CheckError(t, err)
|
||||
|
||||
ct := &PostgresContainer{
|
||||
id: createResp.ID,
|
||||
password: "pgpassword",
|
||||
port: port,
|
||||
cli: cli,
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create docker container: %w", err)
|
||||
}
|
||||
|
||||
HealthCheck:
|
||||
for {
|
||||
inspect, err := cli.ContainerInspect(ctx, createResp.ID)
|
||||
lntest.CheckError(t, err)
|
||||
|
||||
status := inspect.State.Health.Status
|
||||
switch status {
|
||||
case "unhealthy":
|
||||
lntest.CheckError(t, errors.New("container unhealthy"))
|
||||
case "healthy":
|
||||
for {
|
||||
pgxPool, err := pgxpool.Connect(context.Background(), ct.ConnectionString())
|
||||
if err == nil {
|
||||
pgxPool.Close()
|
||||
break HealthCheck
|
||||
}
|
||||
|
||||
<-time.After(50 * time.Millisecond)
|
||||
}
|
||||
default:
|
||||
<-time.After(200 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
go ct.monitorLogs(logfile)
|
||||
return ct
|
||||
c.id = createResp.ID
|
||||
c.isInitialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) monitorLogs(logfile string) {
|
||||
i, err := c.cli.ContainerLogs(context.Background(), c.id, types.ContainerLogsOptions{
|
||||
func (c *PostgresContainer) Stop(ctx context.Context) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
if !c.isStarted {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer c.cli.Close()
|
||||
err := c.cli.ContainerStop(ctx, c.id, nil)
|
||||
c.isStarted = false
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) Cleanup(ctx context.Context) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cli.Close()
|
||||
return cli.ContainerRemove(ctx, c.id, types.ContainerRemoveOptions{
|
||||
Force: true,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) monitorLogs(ctx context.Context) {
|
||||
i, err := c.cli.ContainerLogs(ctx, c.id, types.ContainerLogsOptions{
|
||||
ShowStderr: true,
|
||||
ShowStdout: true,
|
||||
Timestamps: false,
|
||||
@@ -132,7 +207,7 @@ func (c *PostgresContainer) monitorLogs(logfile string) {
|
||||
}
|
||||
defer i.Close()
|
||||
|
||||
file, err := os.OpenFile(logfile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
|
||||
file, err := os.OpenFile(c.logfile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
|
||||
if err != nil {
|
||||
log.Printf("Could not create container log file: %v", err)
|
||||
return
|
||||
@@ -162,39 +237,31 @@ func (c *PostgresContainer) ConnectionString() string {
|
||||
return fmt.Sprintf("postgres://postgres:%s@127.0.0.1:%d/postgres", c.password, c.port)
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) Shutdown(ctx context.Context) error {
|
||||
defer c.cli.Close()
|
||||
timeout := time.Second
|
||||
err := c.cli.ContainerStop(ctx, c.id, &timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) Cleanup(ctx context.Context) error {
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cli.Close()
|
||||
return cli.ContainerRemove(ctx, c.id, types.ContainerRemoveOptions{
|
||||
Force: true,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *PostgresContainer) RunMigrations(t *testing.T, ctx context.Context, migrationDir string) {
|
||||
func (c *PostgresContainer) RunMigrations(ctx context.Context, migrationDir string) error {
|
||||
filenames, err := filepath.Glob(filepath.Join(migrationDir, "*.up.sql"))
|
||||
lntest.CheckError(t, err)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to glob migration files: %w", err)
|
||||
}
|
||||
|
||||
sort.Strings(filenames)
|
||||
|
||||
pgxPool, err := pgxpool.Connect(context.Background(), c.ConnectionString())
|
||||
lntest.CheckError(t, err)
|
||||
pgxPool, err := pgxpool.Connect(ctx, c.ConnectionString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to postgres: %w", err)
|
||||
}
|
||||
defer pgxPool.Close()
|
||||
|
||||
for _, filename := range filenames {
|
||||
data, err := os.ReadFile(filename)
|
||||
lntest.CheckError(t, err)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read migration file '%s': %w", filename, err)
|
||||
}
|
||||
|
||||
_, err = pgxPool.Exec(ctx, string(data))
|
||||
lntest.CheckError(t, err)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute migration file '%s': %w", filename, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user